/*
 * Decompiled with CFR 0.152.
 */
package kafka.producer.async;

import com.yammer.metrics.core.Gauge;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import kafka.metrics.KafkaMetricsGroup;
import kafka.metrics.KafkaMetricsGroup$class;
import kafka.producer.KeyedMessage;
import kafka.producer.async.EventHandler;
import kafka.producer.async.IllegalQueueStateException;
import kafka.utils.Log4jController$;
import kafka.utils.Logging;
import kafka.utils.Logging$class;
import kafka.utils.SystemTime$;
import org.apache.log4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.ScalaObject;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.StringBuilder;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.ObjectRef;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
@ScalaSignature(bytes="\u0006\u0001\u0005\u0005c\u0001B\u0001\u0003\u0001%\u0011!\u0003\u0015:pIV\u001cWM]*f]\u0012$\u0006N]3bI*\u00111\u0001B\u0001\u0006CNLhn\u0019\u0006\u0003\u000b\u0019\t\u0001\u0002\u001d:pIV\u001cWM\u001d\u0006\u0002\u000f\u0005)1.\u00194lC\u000e\u0001Qc\u0001\u0006C\u0019N)\u0001aC\n\u001a?A\u0011A\"E\u0007\u0002\u001b)\u0011abD\u0001\u0005Y\u0006twMC\u0001\u0011\u0003\u0011Q\u0017M^1\n\u0005Ii!A\u0002+ie\u0016\fG\r\u0005\u0002\u0015/5\tQC\u0003\u0002\u0017\r\u0005)Q\u000f^5mg&\u0011\u0001$\u0006\u0002\b\u0019><w-\u001b8h!\tQR$D\u0001\u001c\u0015\tab!A\u0004nKR\u0014\u0018nY:\n\u0005yY\"!E&bM.\fW*\u001a;sS\u000e\u001cxI]8vaB\u0011\u0001eI\u0007\u0002C)\t!%A\u0003tG\u0006d\u0017-\u0003\u0002%C\tY1kY1mC>\u0013'.Z2u\u0011!1\u0003A!b\u0001\n\u00039\u0013A\u0003;ie\u0016\fGMT1nKV\t\u0001\u0006\u0005\u0002*Y9\u0011\u0001EK\u0005\u0003W\u0005\na\u0001\u0015:fI\u00164\u0017BA\u0017/\u0005\u0019\u0019FO]5oO*\u00111&\t\u0005\ta\u0001\u0011\t\u0011)A\u0005Q\u0005YA\u000f\u001b:fC\u0012t\u0015-\\3!\u0011!\u0011\u0004A!b\u0001\n\u0003\u0019\u0014!B9vKV,W#\u0001\u001b\u0011\u0007URD(D\u00017\u0015\t9\u0004(\u0001\u0006d_:\u001cWO\u001d:f]RT!!O\b\u0002\tU$\u0018\u000e\\\u0005\u0003wY\u0012QB\u00117pG.LgnZ)vKV,\u0007\u0003B\u001f?\u0001.k\u0011\u0001B\u0005\u0003\u007f\u0011\u0011AbS3zK\u0012lUm]:bO\u0016\u0004\"!\u0011\"\r\u0001\u0011)1\t\u0001b\u0001\t\n\t1*\u0005\u0002F\u0011B\u0011\u0001ER\u0005\u0003\u000f\u0006\u0012qAT8uQ&tw\r\u0005\u0002!\u0013&\u0011!*\t\u0002\u0004\u0003:L\bCA!M\t\u0015i\u0005A1\u0001E\u0005\u00051\u0006\u0002C(\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u001b\u0002\rE,X-^3!\u0011!\t\u0006A!b\u0001\n\u0003\u0011\u0016a\u00025b]\u0012dWM]\u000b\u0002'B!A+\u0016!L\u001b\u0005\u0011\u0011B\u0001,\u0003\u00051)e/\u001a8u\u0011\u0006tG\r\\3s\u0011!A\u0006A!A!\u0002\u0013\u0019\u0016\u0001\u00035b]\u0012dWM\u001d\u0011\t\u0011i\u0003!Q1A\u0005\u0002m\u000b\u0011\"];fk\u0016$\u0016.\\3\u0016\u0003q\u0003\"\u0001I/\n\u0005y\u000b#\u0001\u0002'p]\u001eD\u0001\u0002\u0019\u0001\u0003\u0002\u0003\u0006I\u0001X\u0001\u000bcV,W/\u001a+j[\u0016\u0004\u0003\u0002\u00032\u0001\u0005\u000b\u0007I\u0011A2\u0002\u0013\t\fGo\u00195TSj,W#\u00013\u0011\u0005\u0001*\u0017B\u00014\"\u0005\rIe\u000e\u001e\u0005\tQ\u0002\u0011\t\u0011)A\u0005I\u0006Q!-\u0019;dQNK'0\u001a\u0011\t\u0011)\u0004!Q1A\u0005\u0002\u001d\n\u0001b\u00197jK:$\u0018\n\u001a\u0005\tY\u0002\u0011\t\u0011)A\u0005Q\u0005I1\r\\5f]RLE\r\t\u0005\u0006]\u0002!\ta\\\u0001\u0007y%t\u0017\u000e\u001e \u0015\u000fA\f(o\u001d;vmB!A\u000b\u0001!L\u0011\u00151S\u000e1\u0001)\u0011\u0015\u0011T\u000e1\u00015\u0011\u0015\tV\u000e1\u0001T\u0011\u0015QV\u000e1\u0001]\u0011\u0015\u0011W\u000e1\u0001e\u0011\u0015QW\u000e1\u0001)\u0011\u001dA\bA1A\u0005\ne\fQb\u001d5vi\u0012|wO\u001c'bi\u000eDW#\u0001>\u0011\u0005UZ\u0018B\u0001?7\u00059\u0019u.\u001e8u\t><h\u000eT1uG\"DaA \u0001!\u0002\u0013Q\u0018AD:ikR$wn\u001e8MCR\u001c\u0007\u000e\t\u0005\n\u0003\u0003\u0001!\u0019!C\u0005\u0003\u0007\tqb\u001d5vi\u0012|wO\\\"p[6\fg\u000eZ\u000b\u0002y!9\u0011q\u0001\u0001!\u0002\u0013a\u0014\u0001E:ikR$wn\u001e8D_6l\u0017M\u001c3!\u0011\u001d\tY\u0001\u0001C!\u0003\u001b\t1A];o)\t\ty\u0001E\u0002!\u0003#I1!a\u0005\"\u0005\u0011)f.\u001b;\t\u000f\u0005]\u0001\u0001\"\u0001\u0002\u001a\u0005A1\u000f[;uI><h.\u0006\u0002\u0002\u0010!9\u0011Q\u0004\u0001\u0005\n\u00055\u0011!\u00049s_\u000e,7o]#wK:$8\u000fC\u0004\u0002\"\u0001!\t!a\t\u0002\u0017Q\u0014\u0018\u0010V8IC:$G.\u001a\u000b\u0005\u0003\u001f\t)\u0003\u0003\u0005\u0002(\u0005}\u0001\u0019AA\u0015\u0003\u0019)g/\u001a8ugB)\u00111FA\u001ey9!\u0011QFA\u001c\u001d\u0011\ty#!\u000e\u000e\u0005\u0005E\"bAA\u001a\u0011\u00051AH]8pizJ\u0011AI\u0005\u0004\u0003s\t\u0013a\u00029bG.\fw-Z\u0005\u0005\u0003{\tyDA\u0002TKFT1!!\u000f\"\u0001")
public class ProducerSendThread<K, V>
extends Thread
implements Logging,
KafkaMetricsGroup,
ScalaObject {
    private final String threadName;
    private final BlockingQueue<KeyedMessage<K, V>> queue;
    private final EventHandler<K, V> handler;
    private final long queueTime;
    private final int batchSize;
    private final String clientId;
    private final CountDownLatch shutdownLatch;
    private final KeyedMessage<K, V> kafka$producer$async$ProducerSendThread$$shutdownCommand;
    private final String loggerName;
    private final Logger logger;
    private String logIdent;
    private final Log4jController$ kafka$utils$Logging$$log4jController;
    public volatile int bitmap$0;

    public String threadName() {
        return this.threadName;
    }

    public BlockingQueue<KeyedMessage<K, V>> queue() {
        return this.queue;
    }

    public EventHandler<K, V> handler() {
        return this.handler;
    }

    public long queueTime() {
        return this.queueTime;
    }

    public int batchSize() {
        return this.batchSize;
    }

    public String clientId() {
        return this.clientId;
    }

    private CountDownLatch shutdownLatch() {
        return this.shutdownLatch;
    }

    public final KeyedMessage<K, V> kafka$producer$async$ProducerSendThread$$shutdownCommand() {
        return this.kafka$producer$async$ProducerSendThread$$shutdownCommand;
    }

    @Override
    public void run() {
        try {
            this.processEvents();
            this.shutdownLatch().countDown();
        }
        catch (Throwable throwable) {
            this.error((Function0<String>)new Serializable(this){
                public static final long serialVersionUID;

                static {
                    long l = serialVersionUID = 0L;
                }

                public final String apply() {
                    return "Error in sending events: ";
                }
            }, (Function0<Throwable>)new Serializable(this, throwable){
                public static final long serialVersionUID;
                private final Throwable e$1;

                static {
                    long l = serialVersionUID = 0L;
                }

                public final Throwable apply() {
                    return this.e$1;
                }
                {
                    this.e$1 = producerSendThread;
                }
            });
        }
        return;
        {
            finally {
                this.shutdownLatch().countDown();
            }
        }
    }

    public void shutdown() {
        this.info((Function0<String>)new Serializable(this){
            public static final long serialVersionUID;

            static {
                long l = serialVersionUID = 0L;
            }

            public final String apply() {
                return "Begin shutting down ProducerSendThread";
            }
        });
        this.queue().put(this.kafka$producer$async$ProducerSendThread$$shutdownCommand());
        this.shutdownLatch().await();
        this.info((Function0<String>)new Serializable(this){
            public static final long serialVersionUID;

            static {
                long l = serialVersionUID = 0L;
            }

            public final String apply() {
                return "Shutdown ProducerSendThread complete";
            }
        });
    }

    private void processEvents() {
        LongRef lastSend$1 = new LongRef(SystemTime$.MODULE$.milliseconds());
        ObjectRef events$1 = new ObjectRef((Object)new ArrayBuffer());
        BooleanRef full$1 = new BooleanRef(false);
        scala.package$.MODULE$.Stream().continually((Function0)new Serializable(this, lastSend$1){
            public static final long serialVersionUID;
            private final ProducerSendThread $outer;
            private final LongRef lastSend$1;

            static {
                long l = serialVersionUID = 0L;
            }

            public final KeyedMessage<K, V> apply() {
                return this.$outer.queue().poll(package$.MODULE$.max(0L, this.lastSend$1.elem + this.$outer.queueTime() - SystemTime$.MODULE$.milliseconds()), TimeUnit.MILLISECONDS);
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
                this.lastSend$1 = producerSendThread;
            }
        }).takeWhile((Function1)new Serializable(this){
            public static final long serialVersionUID;
            private final ProducerSendThread $outer;

            static {
                long l = serialVersionUID = 0L;
            }

            public final boolean apply(KeyedMessage<K, V> item) {
                return item == null ? true : item != this.$outer.kafka$producer$async$ProducerSendThread$$shutdownCommand();
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
            }
        }).foreach((Function1)new Serializable(this, lastSend$1, events$1, full$1){
            public static final long serialVersionUID;
            private final ProducerSendThread $outer;
            private final LongRef lastSend$1;
            private final ObjectRef events$1;
            private final BooleanRef full$1;

            static {
                long l = serialVersionUID = 0L;
            }

            public final void apply(KeyedMessage<K, V> currentQueueItem$1) {
                BoxedUnit boxedUnit;
                boolean expired;
                long elapsed$1 = SystemTime$.MODULE$.milliseconds() - this.lastSend$1.elem;
                boolean bl = expired = currentQueueItem$1 == null;
                if (currentQueueItem$1 == null) {
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    this.$outer.trace((Function0<String>)new Serializable(this, currentQueueItem$1){
                        public static final long serialVersionUID;
                        private final KeyedMessage currentQueueItem$1;

                        static {
                            long l = serialVersionUID = 0L;
                        }

                        public final String apply() {
                            return Predef$.MODULE$.augmentString("Dequeued item for topic %s, partition key: %s, data: %s").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.currentQueueItem$1.topic(), this.currentQueueItem$1.key(), this.currentQueueItem$1.message()}));
                        }
                        {
                            this.currentQueueItem$1 = var2_2;
                        }
                    });
                    boxedUnit = ((ArrayBuffer)this.events$1.elem).$plus$eq(currentQueueItem$1);
                }
                boolean bl2 = this.full$1.elem = ((ArrayBuffer)this.events$1.elem).size() >= this.$outer.batchSize();
                if (this.full$1.elem || expired) {
                    if (expired) {
                        this.$outer.debug((Function0<String>)new Serializable(this, elapsed$1){
                            public static final long serialVersionUID;
                            private final long elapsed$1;

                            static {
                                long l = serialVersionUID = 0L;
                            }

                            public final String apply() {
                                return new StringBuilder().append(this.elapsed$1).append((Object)" ms elapsed. Queue time reached. Sending..").toString();
                            }
                            {
                                this.elapsed$1 = l;
                            }
                        });
                    }
                    if (this.full$1.elem) {
                        this.$outer.debug((Function0<String>)new Serializable(this){
                            public static final long serialVersionUID;

                            static {
                                long l = serialVersionUID = 0L;
                            }

                            public final String apply() {
                                return "Batch full. Sending..";
                            }
                        });
                    }
                    this.$outer.tryToHandle((ArrayBuffer)this.events$1.elem);
                    this.lastSend$1.elem = SystemTime$.MODULE$.milliseconds();
                    this.events$1.elem = new ArrayBuffer();
                }
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
                this.lastSend$1 = producerSendThread;
                this.events$1 = var3_3;
                this.full$1 = var4_4;
            }
        });
        this.tryToHandle((Seq<KeyedMessage<K, V>>)((ArrayBuffer)events$1.elem));
        if (this.queue().size() > 0) {
            throw new IllegalQueueStateException(Predef$.MODULE$.augmentString("Invalid queue state! After queue shutdown, %d remaining items in the queue").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.queue().size())})));
        }
    }

    public void tryToHandle(Seq<KeyedMessage<K, V>> events) {
        int size$1 = events.size();
        try {
            this.debug((Function0<String>)new Serializable(this, size$1){
                public static final long serialVersionUID;
                private final int size$1;

                static {
                    long l = serialVersionUID = 0L;
                }

                public final String apply() {
                    return new StringBuilder().append((Object)"Handling ").append((Object)BoxesRunTime.boxToInteger((int)this.size$1)).append((Object)" events").toString();
                }
                {
                    this.size$1 = n;
                }
            });
            if (size$1 > 0) {
                this.handler().handle(events);
            }
        }
        catch (Throwable throwable) {
            this.error((Function0<String>)new Serializable(this, size$1){
                public static final long serialVersionUID;
                private final int size$1;

                static {
                    long l = serialVersionUID = 0L;
                }

                public final String apply() {
                    return new StringBuilder().append((Object)"Error in handling batch of ").append((Object)BoxesRunTime.boxToInteger((int)this.size$1)).append((Object)" events").toString();
                }
                {
                    this.size$1 = n;
                }
            }, (Function0<Throwable>)new Serializable(this, throwable){
                public static final long serialVersionUID;
                private final Throwable e$2;

                static {
                    long l = serialVersionUID = 0L;
                }

                public final Throwable apply() {
                    return this.e$2;
                }
                {
                    this.e$2 = producerSendThread;
                }
            });
        }
    }

    public ProducerSendThread(String threadName, BlockingQueue<KeyedMessage<K, V>> queue, EventHandler<K, V> handler, long queueTime, int batchSize, String clientId) {
        this.threadName = threadName;
        this.queue = queue;
        this.handler = handler;
        this.queueTime = queueTime;
        this.batchSize = batchSize;
        this.clientId = clientId;
        super(threadName);
        Logging$class.$init$(this);
        KafkaMetricsGroup$class.$init$(this);
        this.shutdownLatch = new CountDownLatch(1);
        this.kafka$producer$async$ProducerSendThread$$shutdownCommand = new KeyedMessage<Object, Object>("shutdown", null, null);
        this.newGauge("ProducerQueueSize", new Gauge<Object>(this){
            private final ProducerSendThread $outer;

            public int value() {
                return this.$outer.queue().size();
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
            }
        }, (Map<String, String>)((Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef$.MODULE$.any2ArrowAssoc((Object)"clientId").$minus$greater((Object)clientId)}))));
    }
}

