package com.hazelcast.topic.impl.reliable;

import com.hazelcast.cluster.Member;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.logging.ILogger;
import com.hazelcast.ringbuffer.ReadResultSet;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.spi.exception.DistributedObjectDestroyedException;
import com.hazelcast.topic.Message;
import com.hazelcast.topic.ReliableMessageListener;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;

/* loaded from: input_file:WEB-INF/lib/hazelcast-5.0.2.jar:com/hazelcast/topic/impl/reliable/MessageRunner.class */
public abstract class MessageRunner<E> implements BiConsumer<ReadResultSet<ReliableTopicMessage>, Throwable> {
    protected final Ringbuffer<ReliableTopicMessage> ringbuffer;
    protected final ILogger logger;
    protected final ReliableMessageListener<E> listener;
    protected final String topicName;
    protected volatile long sequence;
    private final SerializationService serializationService;
    private final ConcurrentMap<UUID, MessageRunner<E>> runnersMap;
    private final UUID id;
    private final Executor executor;
    private final int batchSize;
    private volatile boolean cancelled;

    public MessageRunner(UUID uuid, ReliableMessageListener<E> reliableMessageListener, Ringbuffer<ReliableTopicMessage> ringbuffer, String str, int i, SerializationService serializationService, Executor executor, ConcurrentMap<UUID, MessageRunner<E>> concurrentMap, ILogger iLogger) {
        this.id = uuid;
        this.listener = reliableMessageListener;
        this.ringbuffer = ringbuffer;
        this.topicName = str;
        this.serializationService = serializationService;
        this.logger = iLogger;
        this.batchSize = i;
        this.executor = executor;
        this.runnersMap = concurrentMap;
        long retrieveInitialSequence = reliableMessageListener.retrieveInitialSequence();
        this.sequence = retrieveInitialSequence == -1 ? ringbuffer.tailSequence() + 1 : retrieveInitialSequence;
    }

    public void next() {
        if (this.cancelled) {
            return;
        }
        this.ringbuffer.readManyAsync(this.sequence, 1, this.batchSize, null).whenCompleteAsync(this, this.executor);
    }

    @Override // java.util.function.BiConsumer
    public void accept(ReadResultSet<ReliableTopicMessage> readResultSet, Throwable th) {
        if (this.cancelled) {
            return;
        }
        if (th != null) {
            if (handleInternalException(adjustThrowable(th))) {
                next();
                return;
            } else {
                cancel();
                return;
            }
        }
        long nextSequenceToReadFrom = (readResultSet.getNextSequenceToReadFrom() - readResultSet.readCount()) - this.sequence;
        if (nextSequenceToReadFrom != 0 && !isLossTolerable(nextSequenceToReadFrom)) {
            cancel();
            return;
        }
        for (int i = 0; i < readResultSet.size(); i++) {
            ReliableTopicMessage reliableTopicMessage = readResultSet.get(i);
            try {
                this.listener.storeSequence(readResultSet.getSequence(i));
                this.listener.onMessage(toMessage(reliableTopicMessage));
            } catch (Throwable th2) {
                if (terminate(th2)) {
                    cancel();
                    return;
                }
            }
        }
        this.sequence = readResultSet.getNextSequenceToReadFrom();
        next();
    }

    private Message<E> toMessage(ReliableTopicMessage reliableTopicMessage) {
        Member member = getMember(reliableTopicMessage);
        return new Message<>(this.topicName, this.serializationService.toObject(reliableTopicMessage.getPayload()), reliableTopicMessage.getPublishTime(), member);
    }

    protected abstract Member getMember(ReliableTopicMessage reliableTopicMessage);

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean handleInternalException(Throwable th) {
        if (th instanceof OperationTimeoutException) {
            return handleOperationTimeoutException();
        }
        if (th instanceof IllegalArgumentException) {
            return handleIllegalArgumentException((IllegalArgumentException) th);
        }
        if (th instanceof HazelcastInstanceNotActiveException) {
            if (!this.logger.isFinestEnabled()) {
                return false;
            }
            this.logger.finest("Terminating MessageListener " + this.listener + " on topic: " + this.topicName + ".  Reason: HazelcastInstance is shutting down");
            return false;
        }
        if (!(th instanceof DistributedObjectDestroyedException)) {
            this.logger.warning("Terminating MessageListener " + this.listener + " on topic: " + this.topicName + ". Reason: Unhandled exception, message: " + th.getMessage(), th);
            return false;
        }
        if (!this.logger.isFinestEnabled()) {
            return false;
        }
        this.logger.finest("Terminating MessageListener " + this.listener + " on topic: " + this.topicName + ". Reason: Topic is destroyed");
        return false;
    }

    private boolean handleOperationTimeoutException() {
        if (!this.logger.isFinestEnabled()) {
            return true;
        }
        this.logger.finest("MessageListener " + this.listener + " on topic: " + this.topicName + " timed out. Continuing from last known sequence: " + this.sequence);
        return true;
    }

    protected abstract Throwable adjustThrowable(Throwable th);

    private boolean isLossTolerable(long j) {
        if (!this.listener.isLossTolerant()) {
            this.logger.warning("Terminating MessageListener:" + this.listener + " on topic: " + this.topicName + ". Reason: The listener was too slow or the retention period of the message has been violated. " + j + " messages lost.");
            return false;
        }
        if (!this.logger.isFinestEnabled()) {
            return true;
        }
        this.logger.finest("MessageListener " + this.listener + " on topic: " + this.topicName + " lost " + j + "messages");
        return true;
    }

    private boolean handleIllegalArgumentException(IllegalArgumentException illegalArgumentException) {
        long headSequence = this.ringbuffer.headSequence();
        if (!this.listener.isLossTolerant()) {
            this.logger.warning("Terminating MessageListener:" + this.listener + " on topic: " + this.topicName + ". Reason: Underlying ring buffer data related to reliable topic is lost. ");
            return false;
        }
        if (this.logger.isFinestEnabled()) {
            this.logger.finest(String.format("MessageListener %s on topic %s requested a too large sequence: %s. . Jumping from old sequence: %s to sequence: %s", this.listener, this.topicName, illegalArgumentException.getMessage(), Long.valueOf(this.sequence), Long.valueOf(headSequence)));
        }
        this.sequence = headSequence;
        return true;
    }

    public void cancel() {
        this.cancelled = true;
        this.runnersMap.remove(this.id);
    }

    private boolean terminate(Throwable th) {
        if (this.cancelled) {
            return true;
        }
        try {
            boolean isTerminal = this.listener.isTerminal(th);
            if (isTerminal) {
                this.logger.warning("Terminating MessageListener " + this.listener + " on topic: " + this.topicName + ". Reason: Unhandled exception, message: " + th.getMessage(), th);
            } else if (this.logger.isFinestEnabled()) {
                this.logger.finest("MessageListener " + this.listener + " on topic: " + this.topicName + " ran into an exception: message:" + th.getMessage(), th);
            }
            return isTerminal;
        } catch (Throwable th2) {
            this.logger.warning("Terminating messageListener:" + this.listener + " on topic: " + this.topicName + ". Reason: Unhandled exception while calling ReliableMessageListener.isTerminal() method", th2);
            return true;
        }
    }

    public boolean isCancelled() {
        return this.cancelled;
    }
}
