/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.logging.messaging;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.blitz4j.BlitzConfig;
import com.netflix.blitz4j.LoggingConfiguration;
import com.netflix.logging.messaging.MessageProcessor;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.annotations.Monitor;
import com.netflix.servo.monitor.Counter;
import com.netflix.servo.monitor.Monitors;
import com.netflix.servo.monitor.Stopwatch;
import com.netflix.servo.monitor.Timer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class MessageBatcher<T> {
    private static final BlitzConfig CONFIGURATION = LoggingConfiguration.getInstance().getConfiguration();
    private static final String DOT = ".";
    private static final String BATCHER_PREFIX = "batcher.";
    private static final String COLLECTOR_SUFFIX = ".collector";
    private boolean shouldCollectorShutdown;
    List<Object> batch;
    protected String name;
    protected BlockingQueue queue;
    protected int maxMessages;
    protected static long maxDelay;
    protected Collector collector;
    protected ThreadPoolExecutor processor;
    protected MessageProcessor target;
    protected final AtomicInteger concurrentBatches;
    protected Timer queueSizeTracer;
    protected Timer batchSyncPutTracer;
    protected Timer threadSubmitTracer;
    protected Timer processTimeTracer;
    protected Timer avgBatchSizeTracer;
    protected Counter queueOverflowCounter;
    private volatile boolean isShutDown;
    private AtomicLong numberAdded;
    private AtomicLong numberDropped;
    private boolean blockingProperty;
    private boolean isCollectorPaused;
    private Counter processCount;
    public static final String POOL_MAX_THREADS = "maxThreads";
    public static final String POOL_MIN_THREADS = "minThreads";
    public static final String POOL_KEEP_ALIVE_TIME = "keepAliveTime";

    public MessageBatcher(String name, MessageProcessor target) {
        block2: {
            this.target = null;
            this.concurrentBatches = new AtomicInteger(0);
            this.numberAdded = new AtomicLong();
            this.numberDropped = new AtomicLong();
            this.name = BATCHER_PREFIX + name;
            this.target = target;
            this.queue = new ArrayBlockingQueue(CONFIGURATION.getBatcherQueueMaxMessages(this.name));
            this.setBatchMaxMessages(CONFIGURATION.getBatchSize(this.name));
            this.batch = new ArrayList<Object>(this.maxMessages);
            this.setBatchMaxDelay(CONFIGURATION.getBatcherMaxDelay(this.name));
            this.collector = new Collector(this, this.name + COLLECTOR_SUFFIX);
            this.createProcessor(this.name);
            this.queueSizeTracer = Monitors.newTimer((String)"queue_size");
            this.batchSyncPutTracer = Monitors.newTimer((String)"waitTimeforBuffer");
            this.avgBatchSizeTracer = Monitors.newTimer((String)"batch_size");
            this.processCount = Monitors.newCounter((String)"messages_processed");
            this.threadSubmitTracer = Monitors.newTimer((String)"thread_invocation_time");
            this.processTimeTracer = Monitors.newTimer((String)"message_processTime");
            this.queueOverflowCounter = Monitors.newCounter((String)"queue_overflow");
            this.blockingProperty = CONFIGURATION.shouldWaitWhenBatcherQueueNotEmpty(this.name);
            this.collector.setDaemon(true);
            this.collector.start();
            try {
                Monitors.registerObject((String)this.name, (Object)this);
            }
            catch (Throwable e) {
                if (!CONFIGURATION.shouldPrintLoggingErrors()) break block2;
                e.printStackTrace();
            }
        }
    }

    public synchronized void setTarget(MessageProcessor target) {
        this.target = target;
    }

    public synchronized void setBatchMaxMessages(int maxMessages) {
        this.maxMessages = maxMessages;
    }

    public synchronized void setBatchMaxDelay(double maxDelaySec) {
        maxDelay = (long)(maxDelaySec * 1.0E9);
    }

    public void setProcessorMaxThreads(int maxThreads) {
        if (this.processor.getCorePoolSize() > maxThreads) {
            this.processor.setCorePoolSize(maxThreads);
        }
        this.processor.setMaximumPoolSize(maxThreads);
    }

    public boolean isSpaceAvailable() {
        return this.queue.remainingCapacity() > 0;
    }

    public boolean process(T message) {
        if (this.isShutDown) {
            return false;
        }
        try {
            this.queueSizeTracer.record((long)this.queue.size());
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        if (!this.queue.offer(message)) {
            this.numberDropped.incrementAndGet();
            this.queueOverflowCounter.increment();
            return false;
        }
        this.numberAdded.incrementAndGet();
        return true;
    }

    public void processSync(T message) {
        if (this.isShutDown) {
            return;
        }
        try {
            this.queueSizeTracer.record((long)this.queue.size());
        }
        catch (Throwable ignored) {
            // empty catch block
        }
        try {
            Stopwatch s = this.batchSyncPutTracer.start();
            this.queue.put(message);
            s.stop();
        }
        catch (InterruptedException e) {
            return;
        }
        this.numberAdded.incrementAndGet();
    }

    public void process(List<T> objects) {
        for (T message : objects) {
            if (this.isShutDown) {
                return;
            }
            this.process(message);
        }
    }

    public void process(List<T> objects, boolean sync) {
        for (T message : objects) {
            if (this.isShutDown) {
                return;
            }
            if (sync) {
                this.processSync(message);
                continue;
            }
            this.process(message);
        }
    }

    public void pause() {
        if (!this.isShutDown) {
            this.isCollectorPaused = true;
        }
    }

    public boolean isPaused() {
        return this.isCollectorPaused;
    }

    public void resume() {
        if (!this.isShutDown) {
            this.isCollectorPaused = false;
        }
    }

    public void stop() {
        this.isShutDown = true;
        int waitTimeinMillis = CONFIGURATION.getBatcherWaitTimeBeforeShutdown(this.name);
        long timeToWait = (long)waitTimeinMillis + System.currentTimeMillis();
        while ((this.queue.size() > 0 || this.batch.size() > 0) && System.currentTimeMillis() < timeToWait) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                // empty catch block
                break;
            }
        }
        try {
            this.shouldCollectorShutdown = true;
            this.processor.shutdownNow();
        }
        catch (Throwable e) {
            e.printStackTrace();
        }
    }

    @Monitor(name="batcherQueueSize", type=DataSourceType.GAUGE)
    public int getSize() {
        if (this.queue != null) {
            return this.queue.size();
        }
        return 0;
    }

    public void resetNumberAdded() {
        this.numberAdded.set(0L);
    }

    public void resetNumberDropped() {
        this.numberDropped.set(0L);
    }

    @Monitor(name="numberAdded", type=DataSourceType.GAUGE)
    public long getNumberAdded() {
        return this.numberAdded.get();
    }

    @Monitor(name="numberDropped", type=DataSourceType.GAUGE)
    public long getNumberDropped() {
        return this.numberDropped.get();
    }

    @Monitor(name="blocking", type=DataSourceType.INFORMATIONAL)
    public boolean isBlocking() {
        return this.blockingProperty;
    }

    private void createProcessor(String name) {
        int minThreads = CONFIGURATION.getBatcherMinThreads(this.name);
        int maxThreads = CONFIGURATION.getBatcherMaxThreads(this.name);
        int keepAliveTime = CONFIGURATION.getBatcherThreadKeepAliveTime(this.name);
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(this.name + "-process").build();
        this.processor = new ThreadPoolExecutor(minThreads, maxThreads, (long)keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory);
        boolean shouldRejectWhenFull = CONFIGURATION.shouldRejectWhenAllBatcherThreadsUsed(this.name);
        if (!shouldRejectWhenFull) {
            this.processor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy(){

                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                    super.rejectedExecution(r, e);
                }
            });
        }
    }

    private class Collector
    extends Thread {
        private static final int SLEEP_TIME_MS = 1;
        private Timer processTimeTracer;
        private Counter rejectedCounter;
        private static final int RETRY_EXECUTION_TIMEOUT_MS = 1;
        private final MessageBatcher stream;
        private final Timer queueSizeTracer;

        public Collector(MessageBatcher stream, String name) {
            super(name);
            this.rejectedCounter = Monitors.newCounter((String)(MessageBatcher.this.processCount + ".rejected"));
            this.processTimeTracer = Monitors.newTimer((String)(name + ".processTime"));
            this.stream = stream;
            this.queueSizeTracer = Monitors.newTimer((String)(name + ".queue_size_at_drain"));
        }

        @Override
        public void run() {
            int batchSize = 0;
            while (!MessageBatcher.this.shouldCollectorShutdown) {
                if (MessageBatcher.this.isCollectorPaused) {
                    try {
                        Thread.sleep(1L);
                    }
                    catch (InterruptedException ignore) {}
                    continue;
                }
                try {
                    if (MessageBatcher.this.batch.size() < this.stream.maxMessages) {
                        long now;
                        long firstTime = now = System.nanoTime();
                        do {
                            if (this.stream.queue.drainTo(MessageBatcher.this.batch, this.stream.maxMessages - MessageBatcher.this.batch.size()) > 0) continue;
                            long maxWait = firstTime + maxDelay - now;
                            if (maxWait <= 0L) break;
                            Object nextMessage = null;
                            try {
                                nextMessage = this.stream.queue.poll(maxWait, TimeUnit.NANOSECONDS);
                            }
                            catch (InterruptedException ignore) {
                                // empty catch block
                            }
                            if (nextMessage == null) break;
                            MessageBatcher.this.batch.add(nextMessage);
                            now = System.nanoTime();
                        } while (MessageBatcher.this.batch.size() < this.stream.maxMessages);
                    }
                    if ((batchSize = MessageBatcher.this.batch.size()) <= 0) continue;
                    try {
                        this.queueSizeTracer.record((long)this.stream.queue.size());
                    }
                    catch (Exception ignored) {
                        // empty catch block
                    }
                    MessageBatcher.this.avgBatchSizeTracer.record((long)batchSize);
                    Stopwatch s = this.processTimeTracer.start();
                    boolean retryExecution = false;
                    do {
                        try {
                            this.stream.processor.execute(new ProcessMessages(this.stream, MessageBatcher.this.batch));
                            retryExecution = false;
                        }
                        catch (RejectedExecutionException re) {
                            this.rejectedCounter.increment();
                            retryExecution = true;
                            Thread.sleep(1L);
                        }
                    } while (retryExecution);
                    MessageBatcher.this.processCount.increment((long)batchSize);
                    s.stop();
                    MessageBatcher.this.batch = new ArrayList<Object>(this.stream.maxMessages);
                }
                catch (Throwable e) {
                    if (!CONFIGURATION.shouldPrintLoggingErrors()) continue;
                    e.printStackTrace();
                }
            }
        }
    }

    private static class ProcessMessages
    implements Runnable {
        private final MessageBatcher stream;
        private List batch;
        private Timer processMessagesTracer;
        private Timer avgConcurrentBatches;

        public ProcessMessages(MessageBatcher stream, List batch) {
            this.stream = stream;
            this.batch = batch;
            this.processMessagesTracer = stream.processTimeTracer;
            this.avgConcurrentBatches = Monitors.newTimer((String)(stream.name + ".concurrentBatches"));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                if (this.batch == null) {
                    return;
                }
                int inProcess = this.stream.concurrentBatches.incrementAndGet();
                try {
                    this.avgConcurrentBatches.record((long)inProcess);
                    Stopwatch s = this.processMessagesTracer.start();
                    this.stream.target.process(this.batch);
                    s.stop();
                }
                finally {
                    this.stream.concurrentBatches.decrementAndGet();
                }
            }
            catch (Throwable e) {
                e.printStackTrace();
            }
        }
    }
}

