package reactor.rx.action.aggregation;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscription;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.fn.Consumer;
import reactor.fn.Pausable;
import reactor.fn.timer.Timer;
import reactor.rx.Stream;
import reactor.rx.action.Action;
import reactor.rx.broadcast.Broadcaster;
import reactor.rx.subscription.ReactiveSubscription;

/* loaded from: input_file:WEB-INF/lib/reactor-stream-2.0.8.RELEASE.jar:reactor/rx/action/aggregation/WindowShiftAction.class */
public class WindowShiftAction<T> extends Action<T, Stream<T>> {
    private final Consumer<Long> timeshiftTask;
    private final List<ReactiveSubscription<T>> currentWindows;
    private final int skip;
    private final int batchSize;
    private final long timeshift;
    private final TimeUnit unit;
    private final Timer timer;
    private final Environment environment;
    private final Dispatcher dispatcher;
    private int index;
    private Pausable timeshiftRegistration;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: reactor.rx.action.aggregation.WindowShiftAction$2, reason: invalid class name */
    /* loaded from: input_file:WEB-INF/lib/reactor-stream-2.0.8.RELEASE.jar:reactor/rx/action/aggregation/WindowShiftAction$2.class */
    public class AnonymousClass2 implements Consumer<Long> {
        final /* synthetic */ Dispatcher val$dispatcher;
        final /* synthetic */ Timer val$timer;
        final /* synthetic */ Consumer val$flushTimerTask;
        final /* synthetic */ long val$timespan;
        final /* synthetic */ TimeUnit val$targetUnit;

        AnonymousClass2(Dispatcher dispatcher, Timer timer, Consumer consumer, long j, TimeUnit timeUnit) {
            this.val$dispatcher = dispatcher;
            this.val$timer = timer;
            this.val$flushTimerTask = consumer;
            this.val$timespan = j;
            this.val$targetUnit = timeUnit;
        }

        @Override // reactor.fn.Consumer
        public void accept(Long l) {
            if (WindowShiftAction.this.isPublishing()) {
                this.val$dispatcher.dispatch(null, new Consumer<Void>() { // from class: reactor.rx.action.aggregation.WindowShiftAction.2.1
                    @Override // reactor.fn.Consumer
                    public void accept(Void r7) {
                        final ReactiveSubscription<T> createWindowStream = WindowShiftAction.this.createWindowStream();
                        AnonymousClass2.this.val$timer.submit(new Consumer<Long>() { // from class: reactor.rx.action.aggregation.WindowShiftAction.2.1.1
                            @Override // reactor.fn.Consumer
                            public void accept(Long l2) {
                                AnonymousClass2.this.val$dispatcher.dispatch(createWindowStream, AnonymousClass2.this.val$flushTimerTask, null);
                            }
                        }, AnonymousClass2.this.val$timespan, AnonymousClass2.this.val$targetUnit);
                    }
                }, null);
            }
        }
    }

    public WindowShiftAction(Environment environment, Dispatcher dispatcher, int i, int i2) {
        this(environment, dispatcher, i, i2, -1L, -1L, null, null);
    }

    public WindowShiftAction(Environment environment, Dispatcher dispatcher, int i, int i2, long j, long j2, TimeUnit timeUnit, Timer timer) {
        this.currentWindows = new LinkedList();
        this.dispatcher = dispatcher;
        this.skip = i2;
        this.environment = environment;
        this.batchSize = i;
        if (j <= 0 || j2 <= 0) {
            this.timeshift = -1L;
            this.unit = null;
            this.timer = null;
            this.timeshiftTask = null;
            return;
        }
        TimeUnit timeUnit2 = timeUnit != null ? timeUnit : TimeUnit.SECONDS;
        this.timeshiftTask = new AnonymousClass2(dispatcher, timer, new Consumer<ReactiveSubscription<T>>() { // from class: reactor.rx.action.aggregation.WindowShiftAction.1
            @Override // reactor.fn.Consumer
            public void accept(ReactiveSubscription<T> reactiveSubscription) {
                Iterator it = WindowShiftAction.this.currentWindows.iterator();
                while (it.hasNext()) {
                    if (reactiveSubscription == ((ReactiveSubscription) it.next())) {
                        it.remove();
                        reactiveSubscription.onComplete();
                        return;
                    }
                }
            }
        }, j, timeUnit2);
        this.timeshift = j2;
        this.unit = timeUnit2;
        this.timer = timer;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.rx.action.Action
    public void doOnSubscribe(Subscription subscription) {
        if (this.timer != null) {
            this.timeshiftRegistration = this.timer.schedule(this.timeshiftTask, this.timeshift, this.unit);
        }
    }

    @Override // reactor.rx.action.Action
    protected void doNext(T t) {
        if (this.timer == null) {
            int i = this.index;
            this.index = i + 1;
            if (i % this.skip == 0) {
                createWindowStream();
            }
        }
        flushCallback(t);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.rx.action.Action
    public void doComplete() {
        Iterator<ReactiveSubscription<T>> it = this.currentWindows.iterator();
        while (it.hasNext()) {
            it.next().onComplete();
        }
        this.currentWindows.clear();
        super.doComplete();
    }

    private void flushCallback(T t) {
        Iterator<ReactiveSubscription<T>> it = this.currentWindows.iterator();
        while (it.hasNext()) {
            ReactiveSubscription<T> next = it.next();
            next.onNext(t);
            if (next.currentNextSignals() == this.batchSize) {
                it.remove();
                next.onComplete();
            }
        }
    }

    protected ReactiveSubscription<T> createWindowStream() {
        Broadcaster create = Broadcaster.create(this.environment, this.dispatcher);
        ReactiveSubscription<T> reactiveSubscription = new ReactiveSubscription<>(null, create);
        this.currentWindows.add(reactiveSubscription);
        create.onSubscribe(reactiveSubscription);
        broadcastNext(create);
        return reactiveSubscription;
    }

    @Override // reactor.rx.Stream
    public final Environment getEnvironment() {
        return this.environment;
    }

    @Override // reactor.rx.Stream
    public final Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.rx.action.Action
    public void doError(Throwable th) {
        super.doError(th);
        Iterator<ReactiveSubscription<T>> it = this.currentWindows.iterator();
        while (it.hasNext()) {
            it.next().onError(th);
        }
        this.currentWindows.clear();
    }
}
