package com.hazelcast.jet.impl.processor;

import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.util.concurrent.ManyToOneConcurrentArrayQueue;
import com.hazelcast.internal.util.counters.Counter;
import com.hazelcast.internal.util.counters.SwCounter;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.datamodel.Tuple3;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.pipeline.ServiceFactory;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayDeque;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/* loaded from: input_file:WEB-INF/lib/hazelcast-5.0.2.jar:com/hazelcast/jet/impl/processor/AsyncTransformUsingServiceUnorderedP.class */
public final class AsyncTransformUsingServiceUnorderedP<C, S, T, K, R> extends AbstractAsyncTransformUsingServiceP<C, S> {
    private final BiFunctionEx<? super S, ? super T, ? extends CompletableFuture<Traverser<R>>> callAsyncFn;
    private final Function<? super T, ? extends K> extractKeyFn;
    private ManyToOneConcurrentArrayQueue<Tuple3<T, Long, Object>> resultQueue;
    private final SortedMap<Long, Long> watermarkCounts;
    private final Map<T, Integer> inFlightItems;
    private Traverser<Object> currentTraverser;
    private Traverser<Map.Entry> snapshotTraverser;
    private Long lastReceivedWm;
    private long lastEmittedWm;
    private long minRestoredWm;
    private int asyncOpsCounter;
    private ArrayDeque<T> restoredObjects;

    @Probe(name = "numInFlightOps")
    private final Counter asyncOpsCounterMetric;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:WEB-INF/lib/hazelcast-5.0.2.jar:com/hazelcast/jet/impl/processor/AsyncTransformUsingServiceUnorderedP$Keys.class */
    private enum Keys {
        LAST_EMITTED_WM
    }

    private AsyncTransformUsingServiceUnorderedP(@Nonnull ServiceFactory<C, S> serviceFactory, @Nullable C c, int i, @Nonnull BiFunctionEx<? super S, ? super T, ? extends CompletableFuture<Traverser<R>>> biFunctionEx, @Nonnull Function<? super T, ? extends K> function) {
        super(serviceFactory, c, i, false);
        this.watermarkCounts = new TreeMap();
        this.inFlightItems = new IdentityHashMap();
        this.currentTraverser = Traversers.empty();
        this.lastReceivedWm = Long.MIN_VALUE;
        this.lastEmittedWm = Long.MIN_VALUE;
        this.minRestoredWm = Long.MAX_VALUE;
        this.restoredObjects = new ArrayDeque<>();
        this.asyncOpsCounterMetric = SwCounter.newSwCounter();
        this.callAsyncFn = biFunctionEx;
        this.extractKeyFn = function;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.hazelcast.jet.impl.processor.AbstractTransformUsingServiceP, com.hazelcast.jet.core.AbstractProcessor
    public void init(@Nonnull Processor.Context context) throws Exception {
        super.init(context);
        this.resultQueue = new ManyToOneConcurrentArrayQueue<>(this.maxConcurrentOps);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.hazelcast.jet.core.AbstractProcessor
    protected boolean tryProcess(int i, @Nonnull Object obj) {
        if (getOutbox().hasUnfinishedItem() && !emitFromTraverser(this.currentTraverser)) {
            return false;
        }
        this.asyncOpsCounterMetric.set(this.asyncOpsCounter);
        if (processItem(obj)) {
            return true;
        }
        tryFlushQueue();
        return false;
    }

    @CheckReturnValue
    private boolean processItem(@Nonnull T t) {
        if (this.asyncOpsCounter == this.maxConcurrentOps) {
            return false;
        }
        CompletableFuture<Traverser<R>> apply = this.callAsyncFn.apply(this.service, t);
        if (apply == null) {
            return true;
        }
        this.asyncOpsCounter++;
        this.watermarkCounts.merge(this.lastReceivedWm, 1L, (v0, v1) -> {
            return Long.sum(v0, v1);
        });
        Long l = this.lastReceivedWm;
        apply.whenComplete(ExceptionUtil.withTryCatch(getLogger(), (traverser, th) -> {
            this.resultQueue.add(Tuple3.tuple3(t, l, traverser != null ? traverser : th));
        }));
        this.inFlightItems.merge(t, 1, (v0, v1) -> {
            return Integer.sum(v0, v1);
        });
        return true;
    }

    @Override // com.hazelcast.jet.core.AbstractProcessor, com.hazelcast.jet.core.Processor
    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        if (!emitFromTraverser(this.currentTraverser)) {
            return false;
        }
        if (!$assertionsDisabled && this.lastEmittedWm > this.lastReceivedWm.longValue()) {
            throw new AssertionError("lastEmittedWm=" + this.lastEmittedWm + ", lastReceivedWm=" + this.lastReceivedWm);
        }
        if (watermark.timestamp() <= this.lastReceivedWm.longValue()) {
            return true;
        }
        if (this.watermarkCounts.isEmpty()) {
            if (!tryEmit(watermark)) {
                return false;
            }
            this.lastEmittedWm = watermark.timestamp();
        }
        this.lastReceivedWm = Long.valueOf(watermark.timestamp());
        return true;
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean tryProcess() {
        tryFlushQueue();
        this.asyncOpsCounterMetric.set(this.asyncOpsCounter);
        return true;
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean complete() {
        return tryFlushQueue();
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean saveToSnapshot() {
        if (!$assertionsDisabled && !this.restoredObjects.isEmpty()) {
            throw new AssertionError("restoredObjects not empty");
        }
        if (!emitFromTraverser(this.currentTraverser)) {
            return false;
        }
        if (this.snapshotTraverser == null) {
            LoggingUtil.logFinest(getLogger(), "Saving to snapshot: %s, lastReceivedWm=%d", this.inFlightItems, this.lastReceivedWm);
            this.snapshotTraverser = Traversers.traverseIterable(this.inFlightItems.entrySet()).map(entry -> {
                return Util.entry(this.extractKeyFn.apply((Object) entry.getKey()), Tuple2.tuple2(entry.getKey(), entry.getValue()));
            }).append(Util.entry(BroadcastKey.broadcastKey(Keys.LAST_EMITTED_WM), this.lastReceivedWm)).onFirstNull(() -> {
                this.snapshotTraverser = null;
            });
        }
        return emitFromTraverserToSnapshot(this.snapshotTraverser);
    }

    @Override // com.hazelcast.jet.core.AbstractProcessor
    protected void restoreFromSnapshot(@Nonnull Object obj, @Nonnull Object obj2) {
        if (obj instanceof BroadcastKey) {
            if (!$assertionsDisabled && !((BroadcastKey) obj).key().equals(Keys.LAST_EMITTED_WM)) {
                throw new AssertionError("Unexpected key: " + obj);
            }
            this.minRestoredWm = Math.min(this.minRestoredWm, ((Long) obj2).longValue());
            return;
        }
        Tuple2 tuple2 = (Tuple2) obj2;
        for (int i = 0; i < ((Integer) tuple2.f1()).intValue(); i++) {
            this.restoredObjects.add(tuple2.f0());
            LoggingUtil.logFinest(getLogger(), "Restored: %s", tuple2.f0());
        }
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean finishSnapshotRestore() {
        while (true) {
            T peek = this.restoredObjects.peek();
            if (peek == null || !processItem(peek)) {
                break;
            }
            this.restoredObjects.remove();
        }
        if (!this.restoredObjects.isEmpty()) {
            tryFlushQueue();
            return false;
        }
        if (!emitFromTraverser(this.currentTraverser)) {
            return false;
        }
        this.restoredObjects = new ArrayDeque<>(0);
        this.lastReceivedWm = Long.valueOf(this.minRestoredWm);
        LoggingUtil.logFine(getLogger(), "restored lastReceivedWm=%s", Long.valueOf(this.minRestoredWm));
        return true;
    }

    private boolean tryFlushQueue() {
        while (emitFromTraverser(this.currentTraverser)) {
            Tuple3<T, Long, Object> poll = this.resultQueue.poll();
            if (poll == null) {
                return this.watermarkCounts.isEmpty();
            }
            this.asyncOpsCounter--;
            Integer merge = this.inFlightItems.merge(poll.f0(), -1, (num, num2) -> {
                if (num.intValue() == 1) {
                    return null;
                }
                return Integer.valueOf(num.intValue() + num2.intValue());
            });
            if (!$assertionsDisabled && merge != null && merge.intValue() <= 0) {
                throw new AssertionError("inFlightItemsCount=" + merge);
            }
            Long merge2 = this.watermarkCounts.merge(poll.f1(), -1L, (v0, v1) -> {
                return Long.sum(v0, v1);
            });
            if (!$assertionsDisabled && merge2.longValue() < 0) {
                throw new AssertionError("count=" + merge2);
            }
            if (poll.f2() instanceof Throwable) {
                throw new JetException("Async operation completed exceptionally: " + poll.f2(), (Throwable) poll.f2());
            }
            this.currentTraverser = (Traverser) poll.f2();
            if (this.currentTraverser == null) {
                this.currentTraverser = Traversers.empty();
            }
            if (merge2.longValue() <= 0) {
                long j = Long.MIN_VALUE;
                Iterator<Map.Entry<Long, Long>> it = this.watermarkCounts.entrySet().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    Map.Entry<Long, Long> next = it.next();
                    if (next.getValue().longValue() != 0) {
                        j = next.getKey().longValue();
                        break;
                    }
                    it.remove();
                }
                if (this.watermarkCounts.isEmpty() && this.lastReceivedWm.longValue() > this.lastEmittedWm) {
                    j = this.lastReceivedWm.longValue();
                }
                if (j > Long.MIN_VALUE && j > this.lastEmittedWm) {
                    this.lastEmittedWm = j;
                    this.currentTraverser = this.currentTraverser.append(new Watermark(j));
                }
            }
        }
        return false;
    }

    public static <C, S, T, K, R> ProcessorSupplier supplier(@Nonnull ServiceFactory<C, S> serviceFactory, int i, @Nonnull BiFunctionEx<? super S, ? super T, ? extends CompletableFuture<Traverser<R>>> biFunctionEx, @Nonnull FunctionEx<? super T, ? extends K> functionEx) {
        return ProcessorSupplierWithService.supplierWithService(serviceFactory, (serviceFactory2, obj) -> {
            return new AsyncTransformUsingServiceUnorderedP(serviceFactory2, obj, i, biFunctionEx, functionEx);
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 278902773:
                if (implMethodName.equals("lambda$supplier$a612041d$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/processor/AsyncTransformUsingServiceUnorderedP") && serializedLambda.getImplMethodSignature().equals("(ILcom/hazelcast/function/BiFunctionEx;Lcom/hazelcast/function/FunctionEx;Lcom/hazelcast/jet/pipeline/ServiceFactory;Ljava/lang/Object;)Lcom/hazelcast/jet/core/Processor;")) {
                    int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    BiFunctionEx biFunctionEx = (BiFunctionEx) serializedLambda.getCapturedArg(1);
                    FunctionEx functionEx = (FunctionEx) serializedLambda.getCapturedArg(2);
                    return (serviceFactory2, obj) -> {
                        return new AsyncTransformUsingServiceUnorderedP(serviceFactory2, obj, intValue, biFunctionEx, functionEx);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        $assertionsDisabled = !AsyncTransformUsingServiceUnorderedP.class.desiredAssertionStatus();
    }
}
