package org.pentaho.di.trans.streaming.common;

import com.google.common.annotations.VisibleForTesting;
import io.reactivex.Observable;
import io.reactivex.processors.FlowableProcessor;
import io.reactivex.processors.ReplayProcessor;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import org.pentaho.di.core.logging.LogChannel;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.streaming.api.StreamSource;

/* loaded from: input_file:org/pentaho/di/trans/streaming/common/BlockingQueueStreamSource.class */
public abstract class BlockingQueueStreamSource<T> implements StreamSource<T> {
    private static final Class<?> PKG;
    protected final BaseStreamStep streamStep;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final AtomicBoolean paused = new AtomicBoolean(false);
    private final FlowableProcessor<T> publishProcessor = ReplayProcessor.create();

    @VisibleForTesting
    Semaphore acceptingRowsSemaphore = new Semaphore(1);

    @VisibleForTesting
    LogChannel logChannel = new LogChannel(this);

    protected BlockingQueueStreamSource(BaseStreamStep baseStreamStep) {
        this.streamStep = baseStreamStep;
    }

    @Override // org.pentaho.di.trans.streaming.api.StreamSource
    public Observable<T> observable() {
        return Observable.fromPublisher(this.publishProcessor);
    }

    @Override // org.pentaho.di.trans.streaming.api.StreamSource
    public void close() {
        if (this.publishProcessor.hasComplete()) {
            return;
        }
        this.publishProcessor.onComplete();
    }

    @Override // org.pentaho.di.trans.streaming.api.StreamSource
    public synchronized void pause() {
        if (this.paused.getAndSet(true)) {
            return;
        }
        try {
            if (!$assertionsDisabled && this.acceptingRowsSemaphore.availablePermits() != 1) {
                throw new AssertionError();
            }
            this.acceptingRowsSemaphore.acquire();
        } catch (InterruptedException e) {
            this.logChannel.logError(BaseMessages.getString(PKG, "BlockingQueueStream.PauseInterrupt", new String[0]));
        }
    }

    @Override // org.pentaho.di.trans.streaming.api.StreamSource
    public synchronized void resume() {
        if (this.paused.getAndSet(false)) {
            if (!$assertionsDisabled && this.acceptingRowsSemaphore.availablePermits() != 0) {
                throw new AssertionError();
            }
            this.acceptingRowsSemaphore.release();
        }
    }

    protected void acceptRows(List<T> list) {
        try {
            this.acceptingRowsSemaphore.acquire();
            list.forEach(obj -> {
                this.streamStep.incrementLinesInput();
                this.publishProcessor.onNext(obj);
            });
        } catch (InterruptedException e) {
            this.logChannel.logError(BaseMessages.getString(PKG, "BlockingQueueStream.AcceptRowsInterrupt", new String[]{Arrays.toString(list.toArray())}));
        } finally {
            this.acceptingRowsSemaphore.release();
        }
    }

    public void error(Throwable th) {
        this.publishProcessor.onError(th);
    }

    static {
        $assertionsDisabled = !BlockingQueueStreamSource.class.desiredAssertionStatus();
        PKG = BlockingQueueStreamSource.class;
    }
}
