package org.appng.core.controller.messaging;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.StringUtils;
import org.appng.api.messaging.Event;
import org.appng.api.messaging.EventHandler;
import org.appng.api.messaging.EventRegistry;
import org.appng.api.messaging.Receiver;
import org.appng.api.messaging.Serializer;
import org.appng.api.model.Site;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/appng-core-1.18.0-RC1.jar:org/appng/core/controller/messaging/RabbitMQReceiver.class */
public class RabbitMQReceiver extends RabbitMQBase implements Receiver {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) RabbitMQReceiver.class);
    private static final String RABBIT_MQ_AUTO_DELETE_QUEUE = "rabbitMQAutoDeleteQueue";
    private static final String RABBIT_MQ_DURABLE_QUEUE = "rabbitMQDurableQueue";
    private static final String RABBIT_MQ_EXCLUSIVE_QUEUE = "rabbitMQExclusiveQueue";
    private EventRegistry eventRegistry = new EventRegistry();
    private AMQP.Queue.DeclareOk queueDeclare;

    /* loaded from: input_file:WEB-INF/lib/appng-core-1.18.0-RC1.jar:org/appng/core/controller/messaging/RabbitMQReceiver$EventConsumer.class */
    class EventConsumer extends DefaultConsumer {
        public EventConsumer(Channel channel) {
            super(channel);
        }

        @Override // com.rabbitmq.client.DefaultConsumer, com.rabbitmq.client.Consumer
        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
            Event deserialize = RabbitMQReceiver.this.eventSerializer.deserialize(bArr);
            if (null == deserialize) {
                RabbitMQReceiver.LOGGER.debug("could not read event");
                return;
            }
            try {
                Site site = RabbitMQReceiver.this.eventSerializer.getSite(deserialize.getSiteName());
                String nodeId = RabbitMQReceiver.this.eventSerializer.getNodeId();
                String nodeId2 = deserialize.getNodeId();
                RabbitMQReceiver.LOGGER.debug("current node: {}, originNode node: {}", nodeId, nodeId2);
                if (StringUtils.equals(nodeId, nodeId2)) {
                    RabbitMQReceiver.LOGGER.debug("message is from myself and can be ignored");
                } else {
                    RabbitMQReceiver.LOGGER.info("about to execute {} ", deserialize);
                    Iterator it = RabbitMQReceiver.this.eventRegistry.getHandlers(deserialize).iterator();
                    while (it.hasNext()) {
                        ((EventHandler) it.next()).onEvent(deserialize, RabbitMQReceiver.this.eventSerializer.getEnvironment(), site);
                    }
                }
            } catch (Exception e) {
                RabbitMQReceiver.LOGGER.error("Error while executing event " + deserialize, (Throwable) e);
            }
        }

        public String toString() {
            return getClass().getSimpleName() + " (" + RabbitMQReceiver.this.channel + ")";
        }
    }

    @Override // org.appng.api.messaging.Receiver
    public Receiver configure(Serializer serializer) {
        this.eventSerializer = serializer;
        initialize("appng-rabbitmq-receiver-%d");
        return this;
    }

    @Override // org.appng.api.messaging.Receiver
    public RabbitMQSender createSender() {
        return new RabbitMQSender().configure(this.eventSerializer);
    }

    @Override // org.appng.api.messaging.Receiver
    public void runWith(ExecutorService executorService) {
        try {
            this.channel.exchangeDeclarePassive(this.exchange);
            log().info("Exchange {} already exists.", this.exchange);
        } catch (IOException e) {
            try {
                log().info("Declaring exchange '{}' (type: {})", this.exchange, BuiltinExchangeType.FANOUT);
                this.channel = this.connection.createChannel();
                this.channel.exchangeDeclare(this.exchange, BuiltinExchangeType.FANOUT, true);
            } catch (IOException e2) {
                throw new IllegalStateException("Error declaring exchange.", e2);
            }
        }
        String nodeId = this.eventSerializer.getNodeId();
        String format = null == nodeId ? "" : String.format("%s@%s", nodeId, this.exchange);
        try {
            this.queueDeclare = this.channel.queueDeclarePassive(format);
            log().info("Queue {} already exists.", format);
        } catch (IOException e3) {
            try {
                boolean booleanValue = this.eventSerializer.getPlatformConfig().getBoolean(RABBIT_MQ_DURABLE_QUEUE, false).booleanValue();
                boolean booleanValue2 = this.eventSerializer.getPlatformConfig().getBoolean(RABBIT_MQ_EXCLUSIVE_QUEUE, true).booleanValue();
                boolean booleanValue3 = this.eventSerializer.getPlatformConfig().getBoolean(RABBIT_MQ_AUTO_DELETE_QUEUE, true).booleanValue();
                log().info("Declaring queue '{}'.", format);
                this.channel = this.connection.createChannel();
                this.queueDeclare = this.channel.queueDeclare(format, booleanValue, booleanValue2, booleanValue3, null);
            } catch (IOException e4) {
                throw new IllegalStateException("Error declaring queue.", e3);
            }
        }
        try {
            String queue = this.queueDeclare.getQueue();
            log().info("Binding queue '{}' to exchange {}", queue, this.exchange);
            this.channel.queueBind(queue, this.exchange, "");
            EventConsumer eventConsumer = new EventConsumer(this.channel);
            log().info("Consuming queue '{}' with {}", queue, eventConsumer);
            this.channel.basicConsume(queue, true, (Consumer) eventConsumer);
        } catch (IOException e5) {
            throw new IllegalStateException("Error binding queue to exchange.", e5);
        }
    }

    @Override // org.appng.api.messaging.Receiver
    public void registerHandler(EventHandler<?> eventHandler) {
        this.eventRegistry.register(eventHandler);
    }

    @Override // org.appng.api.messaging.Receiver
    public void setDefaultHandler(EventHandler<?> eventHandler) {
        this.eventRegistry.setDefaultHandler(eventHandler);
    }

    @Override // org.appng.core.controller.messaging.RabbitMQBase
    Logger log() {
        return LOGGER;
    }
}
