/*
 * Decompiled with CFR 0.152.
 */
package org.apache.fluo.core.client;

import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.fluo.api.client.Loader;
import org.apache.fluo.api.client.LoaderExecutor;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.metrics.MetricsReporter;
import org.apache.fluo.core.async.AsyncCommitObserver;
import org.apache.fluo.core.async.AsyncTransaction;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.TransactionImpl;
import org.apache.fluo.core.log.TracingTransaction;
import org.apache.fluo.core.util.Counter;
import org.apache.fluo.core.util.FluoExecutors;
import org.slf4j.LoggerFactory;

public class LoaderExecutorAsyncImpl
implements LoaderExecutor {
    private final ExecutorService executor;
    private final Semaphore semaphore;
    private final int semaphoreSize;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final AtomicReference<Throwable> exceptionRef = new AtomicReference<Object>(null);
    private final Environment env;
    private final Counter commiting = new Counter();

    private void setException(Throwable t) {
        if (!this.exceptionRef.compareAndSet(null, t)) {
            LoggerFactory.getLogger(LoaderExecutorAsyncImpl.class).debug("Multiple exceptions occured, not reporting subsequent ones", t);
        }
    }

    public LoaderExecutorAsyncImpl(FluoConfiguration config, Environment env) {
        this(config, config.getLoaderThreads(), config.getLoaderQueueSize(), env);
    }

    private LoaderExecutorAsyncImpl(FluoConfiguration config, int numThreads, int queueSize, Environment env) {
        if (numThreads < 0 || numThreads == 0 && queueSize != 0) {
            throw new IllegalArgumentException("numThreads must be positive OR numThreads and queueSize must both be 0");
        }
        if (queueSize < 0 || numThreads != 0 && queueSize == 0) {
            throw new IllegalArgumentException("queueSize must be non-negative OR numThreads and queueSize must both be 0");
        }
        this.env = env;
        this.semaphoreSize = queueSize == 0 ? 1 : queueSize;
        this.semaphore = new Semaphore(this.semaphoreSize);
        this.executor = numThreads == 0 ? MoreExecutors.sameThreadExecutor() : FluoExecutors.newFixedThreadPool(numThreads, "loader");
    }

    public void execute(Loader loader) {
        if (this.exceptionRef.get() != null) {
            throw new RuntimeException("Previous failure", this.exceptionRef.get());
        }
        try {
            while (!this.semaphore.tryAcquire(50L, TimeUnit.MILLISECONDS)) {
                if (!this.closed.get()) continue;
                throw new IllegalStateException("LoaderExecutor is closed");
            }
        }
        catch (InterruptedException e1) {
            throw new RuntimeException(e1);
        }
        try {
            this.commiting.increment();
            this.executor.execute(new QueueReleaseRunnable(new LoaderCommitObserver(loader)));
        }
        catch (RejectedExecutionException rje) {
            this.semaphore.release();
            this.commiting.decrement();
            throw rje;
        }
    }

    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.semaphore.acquireUninterruptibly(this.semaphoreSize);
            this.commiting.waitUntilZero();
            if (this.executor != null) {
                this.executor.shutdown();
                while (!this.executor.isTerminated()) {
                    try {
                        this.executor.awaitTermination(3L, TimeUnit.SECONDS);
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
            if (this.exceptionRef.get() != null) {
                throw new RuntimeException(this.exceptionRef.get());
            }
            this.env.getSharedResources().getBatchWriter().waitForAsyncFlush();
        }
    }

    private class QueueReleaseRunnable
    implements Runnable {
        LoaderCommitObserver loaderTask;

        QueueReleaseRunnable(LoaderCommitObserver loaderTask) {
            this.loaderTask = loaderTask;
        }

        @Override
        public void run() {
            LoaderExecutorAsyncImpl.this.semaphore.release();
            this.loaderTask.run();
        }
    }

    class LoaderCommitObserver
    implements AsyncCommitObserver,
    Runnable {
        AsyncTransaction txi;
        Loader loader;
        private AtomicBoolean done = new AtomicBoolean(false);

        private void close() {
            this.txi = null;
            if (this.done.compareAndSet(false, true)) {
                LoaderExecutorAsyncImpl.this.commiting.decrement();
            } else {
                LoggerFactory.getLogger(LoaderCommitObserver.class).error("Close called twice ", (Throwable)new Exception());
            }
        }

        public LoaderCommitObserver(Loader loader2) {
            this.loader = loader2;
        }

        @Override
        public void committed() {
            this.close();
        }

        @Override
        public void failed(Throwable t) {
            this.close();
            LoaderExecutorAsyncImpl.this.setException(t);
        }

        @Override
        public void alreadyAcknowledged() {
            this.close();
            LoggerFactory.getLogger(LoaderCommitObserver.class).error("Already ack called for loader ", (Throwable)new Exception());
        }

        @Override
        public void commitFailed() {
            this.txi = null;
            LoaderExecutorAsyncImpl.this.executor.submit(this);
        }

        @Override
        public void run() {
            this.txi = new TransactionImpl(LoaderExecutorAsyncImpl.this.env);
            if (TracingTransaction.isTracingEnabled()) {
                this.txi = new TracingTransaction(this.txi, this.loader.getClass());
            }
            Loader.Context context = new Loader.Context(){

                public SimpleConfiguration getAppConfiguration() {
                    return LoaderExecutorAsyncImpl.this.env.getAppConfiguration();
                }

                public MetricsReporter getMetricsReporter() {
                    return LoaderExecutorAsyncImpl.this.env.getMetricsReporter();
                }
            };
            try {
                this.loader.load((TransactionBase)this.txi, context);
                LoaderExecutorAsyncImpl.this.env.getSharedResources().getCommitManager().beginCommit(this.txi, this.loader.getClass(), this);
            }
            catch (Exception e) {
                LoaderExecutorAsyncImpl.this.setException(e);
                this.close();
                LoggerFactory.getLogger(LoaderCommitObserver.class).debug(e.getMessage(), (Throwable)e);
            }
        }
    }
}

