package reactor.rx.action.pair;

import java.util.HashMap;
import java.util.Map;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.Environment;
import reactor.fn.BiFunction;
import reactor.fn.tuple.Tuple;
import reactor.fn.tuple.Tuple2;
import reactor.rx.action.Action;
import reactor.rx.stream.MapStream;
import reactor.rx.subscription.PushSubscription;

/* loaded from: input_file:WEB-INF/lib/reactor-stream-2.0.8.RELEASE.jar:reactor/rx/action/pair/ScanByKeyAction.class */
public class ScanByKeyAction<K, V> extends Action<Tuple2<K, V>, Tuple2<K, V>> {
    protected final BiFunction<? super V, ? super V, V> fn;
    protected final Publisher<? extends MapStream.Signal<K, V>> mapListener;
    protected final Map<K, V> store;

    public ScanByKeyAction(BiFunction<? super V, ? super V, V> biFunction, MapStream<K, V> mapStream) {
        this(biFunction, mapStream, mapStream);
    }

    public ScanByKeyAction(BiFunction<? super V, ? super V, V> biFunction, Map<K, V> map, Publisher<? extends MapStream.Signal<K, V>> publisher) {
        this.fn = biFunction;
        this.store = map == null ? new HashMap<>() : map;
        if (publisher != null) {
            this.mapListener = publisher;
            return;
        }
        MapStream mapStream = null;
        if (MapStream.class.isAssignableFrom(this.store.getClass())) {
            try {
                mapStream = (MapStream) this.store;
            } catch (ClassCastException e) {
                if (Environment.alive()) {
                    Environment.get().routeError(e);
                }
            }
        }
        this.mapListener = mapStream;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // reactor.rx.action.Action, org.reactivestreams.Publisher
    public void subscribe(final Subscriber<? super Tuple2<K, V>> subscriber) {
        if (this.mapListener != null) {
            this.mapListener.subscribe(new Subscriber<MapStream.Signal<K, V>>() { // from class: reactor.rx.action.pair.ScanByKeyAction.1
                Subscription s;
                PushSubscription<Tuple2<K, V>> child;

                @Override // org.reactivestreams.Subscriber
                public void onSubscribe(final Subscription subscription) {
                    this.s = subscription;
                    this.child = new PushSubscription<Tuple2<K, V>>(ScanByKeyAction.this, subscriber) { // from class: reactor.rx.action.pair.ScanByKeyAction.1.1
                        /* JADX INFO: Access modifiers changed from: protected */
                        @Override // reactor.rx.subscription.PushSubscription
                        public void onRequest(long j) {
                            subscription.request(j);
                            if (ScanByKeyAction.this.upstreamSubscription == null) {
                                updatePendingRequests(j);
                            } else {
                                ScanByKeyAction.this.requestUpstream(-1L, isComplete(), j);
                            }
                        }

                        @Override // reactor.rx.subscription.PushSubscription, org.reactivestreams.Subscription
                        public void cancel() {
                            super.cancel();
                            subscription.cancel();
                        }
                    };
                    ScanByKeyAction.this.addSubscription(this.child);
                    subscriber.onSubscribe(this.child);
                }

                @Override // org.reactivestreams.Subscriber
                public void onNext(MapStream.Signal<K, V> signal) {
                    if (this.child == null || signal.op() != MapStream.Operation.put) {
                        return;
                    }
                    ScanByKeyAction.this.doNext(this.child, signal.pair());
                }

                @Override // org.reactivestreams.Subscriber
                public void onError(Throwable th) {
                    if (this.s != null) {
                        this.s.cancel();
                    }
                    if (this.child != null) {
                        this.child.onError(th);
                    }
                }

                @Override // org.reactivestreams.Subscriber
                public void onComplete() {
                    if (this.s != null) {
                        this.s.cancel();
                    }
                    if (this.child != null) {
                        this.child.onComplete();
                    }
                }
            });
        } else {
            super.subscribe(subscriber);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    @Override // reactor.rx.action.Action
    public void doNext(Tuple2<K, V> tuple2) {
        V v = this.store.get(tuple2.t1);
        V apply = v == null ? tuple2.t2 : this.fn.apply(v, tuple2.t2);
        this.store.put(tuple2.t1, apply);
        if (this.mapListener != null || this.downstreamSubscription == null) {
            return;
        }
        doNext(this.downstreamSubscription, Tuple.of(tuple2.t1, apply));
    }

    protected void doNext(PushSubscription<Tuple2<K, V>> pushSubscription, Tuple2<K, V> tuple2) {
        try {
            pushSubscription.onNext(tuple2);
        } catch (Throwable th) {
            pushSubscription.onError(th);
        }
    }
}
