/*
 * Decompiled with CFR 0.152.
 */
package org.apache.fluo.core.async;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.fluo.core.async.AsyncCommitObserver;
import org.apache.fluo.core.async.AsyncTransaction;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.FluoConfigurationImpl;
import org.apache.fluo.core.util.Limit;
import org.apache.fluo.core.worker.TxResult;
import org.slf4j.LoggerFactory;

public class CommitManager {
    private Limit memoryLimit;
    private AtomicInteger commitingTransactions;

    public CommitManager(Environment env) {
        this.memoryLimit = new Limit(FluoConfigurationImpl.getTxCommitMemory(env.getConfiguration()));
        this.commitingTransactions = new AtomicInteger(0);
        env.getSharedResources().getMetricRegistry().register(env.getMetricNames().getCommitsProcessing(), (Metric)new Gauge<Integer>(){

            public Integer getValue() {
                return CommitManager.this.commitingTransactions.get();
            }
        });
    }

    public void beginCommit(AsyncTransaction tx, Class<?> txExecClass, AsyncCommitObserver aco) {
        Objects.requireNonNull(tx);
        Objects.requireNonNull(txExecClass);
        Objects.requireNonNull(aco);
        int size = tx.getSize();
        this.memoryLimit.acquire(size);
        this.commitingTransactions.incrementAndGet();
        CQCommitObserver myAco = new CQCommitObserver(tx, aco, txExecClass, size);
        tx.getStats().setCommitBeginTime(System.currentTimeMillis());
        tx.commitAsync(myAco);
    }

    public void close() {
    }

    private class CQCommitObserver
    implements AsyncCommitObserver {
        private final AsyncTransaction tx;
        private final AsyncCommitObserver aco;
        private final int size;
        private final AtomicBoolean finished = new AtomicBoolean(false);
        private final Class<?> txExecClass;

        private void finish(TxResult status) {
            if (this.finished.compareAndSet(false, true)) {
                CommitManager.this.commitingTransactions.decrementAndGet();
                this.tx.getStats().setCommitFinishTime(System.currentTimeMillis());
                this.tx.getStats().report(status.toString(), this.txExecClass);
                CommitManager.this.memoryLimit.release(this.size);
                try {
                    this.tx.close();
                }
                catch (Exception e) {
                    LoggerFactory.getLogger(CommitManager.class).warn("Failed to close transaction ", (Throwable)e);
                }
            }
        }

        public CQCommitObserver(AsyncTransaction tx, AsyncCommitObserver aco, Class<?> txExecClass, int size) {
            this.tx = tx;
            this.aco = aco;
            this.size = size;
            this.txExecClass = txExecClass;
        }

        @Override
        public void committed() {
            try {
                this.aco.committed();
            }
            finally {
                this.finish(TxResult.COMMITTED);
            }
        }

        @Override
        public void failed(Throwable t) {
            try {
                this.aco.failed(t);
            }
            finally {
                this.finish(TxResult.ERROR);
            }
        }

        @Override
        public void alreadyAcknowledged() {
            try {
                this.aco.alreadyAcknowledged();
            }
            finally {
                this.finish(TxResult.AACKED);
            }
        }

        @Override
        public void commitFailed() {
            try {
                this.aco.commitFailed();
            }
            finally {
                this.finish(TxResult.COMMIT_EXCEPTION);
            }
        }
    }
}

