/*
 * Decompiled with CFR 0.152.
 */
package org.apache.fluo.core.impl;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.data.Mutation;
import org.apache.fluo.core.util.FluoThreadFactory;

public class SharedBatchWriter {
    private final BatchWriter bw;
    private ArrayBlockingQueue<MutationBatch> mutQueue;
    private MutationBatch end = new MutationBatch(new ArrayList<Mutation>(), false);
    private AtomicLong asyncBatchesAdded = new AtomicLong(0L);
    private long asyncBatchesProcessed = 0L;
    private static final Runnable DO_NOTHING = new Runnable(){

        @Override
        public void run() {
        }
    };

    SharedBatchWriter(BatchWriter bw) {
        this.bw = bw;
        this.mutQueue = new ArrayBlockingQueue(100000);
        Thread thread = new FluoThreadFactory("sharedBW").newThread(new FlushTask());
        thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                System.err.println("Uncaught exception in shared batch writer");
                e.printStackTrace();
            }
        });
        thread.setDaemon(true);
        thread.start();
    }

    void writeMutation(Mutation m) {
        this.writeMutations(Collections.singletonList(m));
    }

    void writeMutations(Collection<Mutation> ml) {
        if (ml.size() == 0) {
            return;
        }
        try {
            MutationBatch mb = new MutationBatch(ml, false);
            this.mutQueue.put(mb);
            mb.cdl.await();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    ListenableFuture<Void> writeMutationsAsyncFuture(Collection<Mutation> ml) {
        if (ml.size() == 0) {
            return Futures.immediateFuture(null);
        }
        ListenableFutureTask lf = ListenableFutureTask.create((Runnable)DO_NOTHING, null);
        try {
            MutationBatch mb = new MutationBatch(ml, (ListenableFutureTask<Void>)lf);
            this.mutQueue.put(mb);
            return lf;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    ListenableFuture<Void> writeMutationsAsyncFuture(Mutation m) {
        return this.writeMutationsAsyncFuture(Collections.singleton(m));
    }

    void close() {
        try {
            this.mutQueue.put(this.end);
            this.end.cdl.await();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    void writeMutationsAsync(List<Mutation> ml) {
        try {
            MutationBatch mb = new MutationBatch(ml, true);
            this.asyncBatchesAdded.incrementAndGet();
            this.mutQueue.put(mb);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    void writeMutationAsync(Mutation m) {
        this.writeMutationsAsync(Collections.singletonList(m));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void waitForAsyncFlush() {
        long numAdded = this.asyncBatchesAdded.get();
        SharedBatchWriter sharedBatchWriter = this;
        synchronized (sharedBatchWriter) {
            while (numAdded > this.asyncBatchesProcessed) {
                try {
                    this.wait();
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    private class FlushTask
    implements Runnable {
        private FlushTask() {
        }

        @Override
        public void run() {
            boolean keepRunning = true;
            ArrayList<MutationBatch> batches = new ArrayList<MutationBatch>();
            while (keepRunning || batches.size() > 0) {
                try {
                    if (batches.size() == 0) {
                        batches.add((MutationBatch)SharedBatchWriter.this.mutQueue.take());
                    }
                    SharedBatchWriter.this.mutQueue.drainTo(batches);
                    this.processBatches(batches);
                    for (MutationBatch mutationBatch : batches) {
                        if (mutationBatch != SharedBatchWriter.this.end) continue;
                        keepRunning = false;
                    }
                    batches.clear();
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void processBatches(ArrayList<MutationBatch> batches) throws MutationsRejectedException {
            for (MutationBatch mutationBatch : batches) {
                if (mutationBatch == SharedBatchWriter.this.end) continue;
                SharedBatchWriter.this.bw.addMutations((Iterable)mutationBatch.mutations);
            }
            SharedBatchWriter.this.bw.flush();
            int numAsync = 0;
            for (MutationBatch mutationBatch : batches) {
                mutationBatch.countDown();
                if (!mutationBatch.isAsync) continue;
                ++numAsync;
            }
            if (numAsync > 0) {
                SharedBatchWriter sharedBatchWriter = SharedBatchWriter.this;
                synchronized (sharedBatchWriter) {
                    SharedBatchWriter.this.asyncBatchesProcessed = SharedBatchWriter.this.asyncBatchesProcessed + (long)numAsync;
                    SharedBatchWriter.this.notifyAll();
                }
            }
        }
    }

    private static class MutationBatch {
        private Collection<Mutation> mutations;
        private CountDownLatch cdl;
        private boolean isAsync = false;
        private ListenableFutureTask<Void> lf;

        public MutationBatch(Collection<Mutation> mutations, boolean isAsync) {
            this.mutations = mutations;
            this.isAsync = isAsync;
            if (!isAsync) {
                this.cdl = new CountDownLatch(1);
            }
        }

        public MutationBatch(Collection<Mutation> mutations, ListenableFutureTask<Void> lf) {
            this.mutations = mutations;
            this.lf = lf;
            this.cdl = null;
            this.isAsync = false;
        }

        public void countDown() {
            if (this.cdl != null) {
                this.cdl.countDown();
            }
            if (this.lf != null) {
                this.lf.run();
            }
        }
    }
}

