/*
 * Decompiled with CFR 0.152.
 */
package org.apache.fluo.core.async;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.ConditionalWriter;
import org.apache.accumulo.core.data.ConditionalMutation;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.util.FluoExecutors;
import org.apache.fluo.core.util.Limit;

public class AsyncConditionalWriter
implements AsyncFunction<Collection<ConditionalMutation>, Iterator<ConditionalWriter.Result>> {
    private final ConditionalWriter cw;
    private final ListeningExecutorService les;
    private final Limit semaphore;

    public AsyncConditionalWriter(Environment env, ConditionalWriter cw) {
        this.cw = cw;
        int numThreads = env.getConfiguration().getInt("fluo.impl.async.cw.threads", 8);
        int permits = env.getConfiguration().getInt("fluo.impl.async.cw.limit", 100000);
        this.les = MoreExecutors.listeningDecorator((ExecutorService)FluoExecutors.newFixedThreadPool(numThreads, "asyncCW"));
        this.semaphore = new Limit(permits);
    }

    public ListenableFuture<Iterator<ConditionalWriter.Result>> apply(Collection<ConditionalMutation> input) {
        if (input.size() == 0) {
            return Futures.immediateFuture(Collections.emptyList().iterator());
        }
        this.semaphore.acquire(input.size());
        Iterator iter = this.cw.write(input.iterator());
        return this.les.submit((Callable)new IterTask(iter, input.size()));
    }

    public void close() {
        this.les.shutdownNow();
        try {
            this.les.awaitTermination(5L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private class IterTask
    implements Callable<Iterator<ConditionalWriter.Result>> {
        private Iterator<ConditionalWriter.Result> input;
        private int permitsAcquired;

        public IterTask(Iterator<ConditionalWriter.Result> iter, int permitsAcquired) {
            this.input = iter;
            this.permitsAcquired = permitsAcquired;
        }

        @Override
        public Iterator<ConditionalWriter.Result> call() throws Exception {
            try {
                ImmutableList.Builder imlb = ImmutableList.builder();
                while (this.input.hasNext()) {
                    ConditionalWriter.Result result = this.input.next();
                    imlb.add((Object)result);
                }
                UnmodifiableIterator unmodifiableIterator = imlb.build().iterator();
                return unmodifiableIterator;
            }
            finally {
                AsyncConditionalWriter.this.semaphore.release(this.permitsAcquired);
            }
        }
    }
}

