/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.broadcast;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.core.queue.CompletableQueue;
import reactor.core.support.Assert;
import reactor.core.support.Exceptions;
import reactor.fn.Consumer;
import reactor.rx.action.Action;
import reactor.rx.subscription.PushSubscription;
import reactor.rx.subscription.ReactiveSubscription;

public class Broadcaster<O>
extends Action<O, O> {
    public static final Subscription HOT_SUBSCRIPTION = new PushSubscription(null, null){

        @Override
        public void request(long n) {
        }

        @Override
        public void cancel() {
        }
    };
    protected final Dispatcher dispatcher;
    protected final Environment environment;

    public static <T> Broadcaster<T> create() {
        return new Broadcaster(null, (Dispatcher)SynchronousDispatcher.INSTANCE, Long.MAX_VALUE);
    }

    public static <T> Broadcaster<T> create(Environment env) {
        return Broadcaster.create(env, env.getDefaultDispatcher());
    }

    public static <T> Broadcaster<T> create(Dispatcher dispatcher) {
        return Broadcaster.create(null, dispatcher);
    }

    public static <T> Broadcaster<T> create(Environment env, Dispatcher dispatcher) {
        Assert.state((boolean)dispatcher.supportsOrdering(), (String)"Dispatcher provided doesn't support event ordering.  For concurrent consume, refer to Stream#partition/groupBy() method and assign individual single dispatchers");
        return new Broadcaster(env, dispatcher, Action.evaluateCapacity(dispatcher.backlogSize()));
    }

    protected Broadcaster(Environment environment, Dispatcher dispatcher, long capacity) {
        super(capacity);
        this.dispatcher = dispatcher;
        this.environment = environment;
        this.upstreamSubscription = (PushSubscription)HOT_SUBSCRIPTION;
    }

    @Override
    public final Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    @Override
    protected void doNext(O ev) {
        this.broadcastNext(ev);
    }

    @Override
    public void onNext(O ev) {
        if (ev == null) {
            throw new NullPointerException("Spec 2.13: Signal cannot be null");
        }
        if (!this.dispatcher.inContext()) {
            this.dispatcher.dispatch(ev, (Consumer)this, null);
        } else {
            super.onNext(ev);
        }
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        if (this.upstreamSubscription == HOT_SUBSCRIPTION) {
            this.upstreamSubscription = null;
            super.onSubscribe(subscription);
            PushSubscription downSub = this.downstreamSubscription;
            if (downSub != null && downSub.pendingRequestSignals() > 0L) {
                subscription.request(downSub.pendingRequestSignals());
            }
        } else {
            super.onSubscribe(subscription);
        }
    }

    @Override
    public void onError(Throwable cause) {
        if (cause == null) {
            throw new NullPointerException("Spec 2.13: Signal cannot be null");
        }
        if (!this.dispatcher.inContext()) {
            this.dispatcher.dispatch((Object)cause, (Consumer)new Consumer<Throwable>(){

                public void accept(Throwable throwable) {
                    Broadcaster.super.doError(throwable);
                }
            }, null);
        } else {
            super.onError(cause);
        }
    }

    @Override
    public void onComplete() {
        if (!this.dispatcher.inContext()) {
            this.dispatcher.dispatch(null, (Consumer)new Consumer<Void>(){

                public void accept(Void aVoid) {
                    Broadcaster.super.onComplete();
                }
            }, null);
        } else {
            super.onComplete();
        }
    }

    @Override
    protected PushSubscription<O> createSubscription(Subscriber<? super O> subscriber, CompletableQueue<O> queue) {
        if (queue != null) {
            return new ReactiveSubscription<O>(this, subscriber, queue){

                @Override
                protected void onRequest(long elements) {
                    if (Broadcaster.this.upstreamSubscription != null) {
                        super.onRequest(elements);
                        Broadcaster.this.requestUpstream(Broadcaster.this.capacity, this.buffer.isComplete(), elements);
                    }
                }
            };
        }
        return super.createSubscription(subscriber, null);
    }

    @Override
    protected PushSubscription<O> createSubscription(Subscriber<? super O> subscriber, boolean reactivePull) {
        if (reactivePull) {
            return super.createSubscription(subscriber, true);
        }
        return super.createSubscription(subscriber, this.dispatcher != SynchronousDispatcher.INSTANCE && this.upstreamSubscription != null && !this.upstreamSubscription.hasPublisher());
    }

    @Override
    protected void subscribeWithSubscription(Subscriber<? super O> subscriber, PushSubscription<O> subscription) {
        try {
            if (!this.addSubscription(subscription)) {
                subscriber.onError((Throwable)new IllegalStateException("The subscription cannot be linked to this Stream"));
            } else {
                subscriber.onSubscribe(subscription);
            }
        }
        catch (Exception e) {
            Exceptions.throwIfFatal((Throwable)e);
            subscriber.onError((Throwable)e);
        }
    }

    @Override
    public void cancel() {
        if (this.upstreamSubscription != HOT_SUBSCRIPTION) {
            super.cancel();
        }
    }

    @Override
    public void recycle() {
        if (HOT_SUBSCRIPTION != this.upstreamSubscription) {
            this.upstreamSubscription = null;
        } else {
            this.downstreamSubscription = null;
        }
    }

    @Override
    public Broadcaster<O> capacity(long elements) {
        super.capacity(elements);
        return this;
    }

    @Override
    protected void requestUpstream(long capacity, boolean terminated, long elements) {
        if (this.upstreamSubscription != null && this.upstreamSubscription != HOT_SUBSCRIPTION && !terminated) {
            this.requestMore(elements);
        } else {
            PushSubscription _downstreamSubscription = this.downstreamSubscription;
            if (_downstreamSubscription != null && _downstreamSubscription.pendingRequestSignals() == 0L) {
                _downstreamSubscription.updatePendingRequests(elements);
            }
        }
    }
}

