/*
 * Decompiled with CFR 0.152.
 */
package kafka.producer;

import java.io.File;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Properties;
import kafka.admin.AdminUtils$;
import kafka.api.ProducerRequest;
import kafka.api.ProducerResponse;
import kafka.api.ProducerResponseStatus;
import kafka.common.ErrorMapping$;
import kafka.common.TopicAndPartition;
import kafka.integration.KafkaServerTestHarness;
import kafka.integration.KafkaServerTestHarness$class;
import kafka.message.ByteBufferMessageSet;
import kafka.message.CompressionCodec;
import kafka.message.Message;
import kafka.message.Message$;
import kafka.message.MessageSet$;
import kafka.message.NoCompressionCodec$;
import kafka.network.SocketServer;
import kafka.producer.SyncProducer;
import kafka.producer.SyncProducerConfig;
import kafka.producer.SyncProducerConfig$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.Log4jController$;
import kafka.utils.Logging;
import kafka.utils.SystemTime$;
import kafka.utils.TestUtils$;
import kafka.utils.ZkUtils;
import kafka.zk.EmbeddedZookeeper;
import kafka.zk.ZooKeeperTestHarness$class;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.TraitSetter;

/*
 * Duplicate member names - consider using --renamedupmembers true
 */
@ScalaSignature(bytes="\u0006\u0001u3A!\u0001\u0002\u0001\u000f\t\u00012+\u001f8d!J|G-^2feR+7\u000f\u001e\u0006\u0003\u0007\u0011\t\u0001\u0002\u001d:pIV\u001cWM\u001d\u0006\u0002\u000b\u0005)1.\u00194lC\u000e\u00011c\u0001\u0001\t%A\u0011\u0011\u0002E\u0007\u0002\u0015)\u00111\u0002D\u0001\u0006UVt\u0017\u000e\u001e\u0006\u0003\u001b9\t\u0011b]2bY\u0006$Xm\u001d;\u000b\u0003=\t1a\u001c:h\u0013\t\t\"B\u0001\u0006K+:LGoU;ji\u0016\u0004\"a\u0005\f\u000e\u0003QQ!!\u0006\u0003\u0002\u0017%tG/Z4sCRLwN\\\u0005\u0003/Q\u0011acS1gW\u0006\u001cVM\u001d<feR+7\u000f\u001e%be:,7o\u001d\u0005\u00063\u0001!\tAG\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003m\u0001\"\u0001\b\u0001\u000e\u0003\tAqA\b\u0001C\u0002\u0013%q$\u0001\u0007nKN\u001c\u0018mZ3CsR,7/F\u0001!!\r\tCEJ\u0007\u0002E)\t1%A\u0003tG\u0006d\u0017-\u0003\u0002&E\t)\u0011I\u001d:bsB\u0011\u0011eJ\u0005\u0003Q\t\u0012AAQ=uK\"1!\u0006\u0001Q\u0001\n\u0001\nQ\"\\3tg\u0006<WMQ=uKN\u0004\u0003\"\u0002\u0017\u0001\t\u0003i\u0013aD4f]\u0016\u0014\u0018\r^3D_:4\u0017nZ:\u0015\u00039\u00022a\f\u001b7\u001b\u0005\u0001$BA\u00193\u0003%IW.\\;uC\ndWM\u0003\u00024E\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u0005U\u0002$\u0001\u0002'jgR\u0004\"a\u000e\u001e\u000e\u0003aR!!\u000f\u0003\u0002\rM,'O^3s\u0013\tY\u0004HA\u0006LC\u001a\\\u0017mQ8oM&<\u0007\"B\u001f\u0001\t\u0003q\u0014a\u0005;fgR\u0014V-Y2iC\ndWmU3sm\u0016\u0014H#A \u0011\u0005\u0005\u0002\u0015BA!#\u0005\u0011)f.\u001b;)\u0005q\u001a\u0005C\u0001#G\u001b\u0005)%BA\u0006\u000f\u0013\t9UI\u0001\u0003UKN$\b\"B%\u0001\t\u0003q\u0014a\u0006;fgR,U\u000e\u001d;z!J|G-^2f%\u0016\fX/Z:uQ\tA5\tC\u0003M\u0001\u0011\u0005a(A\fuKN$X*Z:tC\u001e,7+\u001b>f)>|G*\u0019:hK\"\u00121j\u0011\u0005\u0006\u001f\u0002!\tAP\u0001#i\u0016\u001cH/T3tg\u0006<WmU5{KR{w\u000eT1sO\u0016<\u0016\u000e\u001e5BG.TVM]8)\u00059\u001b\u0005\"\u0002*\u0001\t\u0003q\u0014\u0001\n;fgR\u0004&o\u001c3vG\u0016\u001cuN\u001d:fGRd\u0017PU3dK&4Xm\u001d*fgB|gn]3)\u0005E\u001b\u0005\"B+\u0001\t\u0003q\u0014A\u0006;fgR\u0004&o\u001c3vG\u0016\u00148)\u00198US6,w.\u001e;)\u0005Q\u001b\u0005\"\u0002-\u0001\t\u0003q\u0014\u0001\t;fgR\u0004&o\u001c3vG\u0016\u0014V-];fgR<\u0016\u000e\u001e5O_J+7\u000f]8og\u0016D#aV\"\t\u000bm\u0003A\u0011\u0001 \u0002+Q,7\u000f\u001e(pi\u0016sw.^4i%\u0016\u0004H.[2bg\"\u0012!l\u0011")
public class SyncProducerTest
extends JUnitSuite
implements KafkaServerTestHarness {
    private final byte[] messageBytes;
    private Seq<KafkaConfig> instanceConfigs;
    private Buffer<KafkaServer> servers;
    private String brokerList;
    private boolean[] alive;
    private EmbeddedZookeeper zookeeper;
    private int zkPort;
    private ZkUtils zkUtils;
    private final int zkConnectionTimeout;
    private final int zkSessionTimeout;
    private final String loggerName;
    private final Logger logger;
    private String logIdent;
    private final Log4jController$ kafka$utils$Logging$$log4jController;
    private volatile boolean bitmap$0;

    @Override
    public Seq<KafkaConfig> instanceConfigs() {
        return this.instanceConfigs;
    }

    @Override
    @TraitSetter
    public void instanceConfigs_$eq(Seq<KafkaConfig> x$1) {
        this.instanceConfigs = x$1;
    }

    @Override
    public Buffer<KafkaServer> servers() {
        return this.servers;
    }

    @Override
    @TraitSetter
    public void servers_$eq(Buffer<KafkaServer> x$1) {
        this.servers = x$1;
    }

    @Override
    public String brokerList() {
        return this.brokerList;
    }

    @Override
    @TraitSetter
    public void brokerList_$eq(String x$1) {
        this.brokerList = x$1;
    }

    @Override
    public boolean[] alive() {
        return this.alive;
    }

    @Override
    @TraitSetter
    public void alive_$eq(boolean[] x$1) {
        this.alive = x$1;
    }

    @Override
    public /* synthetic */ void kafka$integration$KafkaServerTestHarness$$super$setUp() {
        ZooKeeperTestHarness$class.setUp(this);
    }

    @Override
    public /* synthetic */ void kafka$integration$KafkaServerTestHarness$$super$tearDown() {
        ZooKeeperTestHarness$class.tearDown(this);
    }

    @Override
    public Seq<KafkaConfig> configs() {
        return KafkaServerTestHarness$class.configs(this);
    }

    @Override
    public Option<KafkaServer> serverForId(int id) {
        return KafkaServerTestHarness$class.serverForId(this, id);
    }

    @Override
    public SecurityProtocol securityProtocol() {
        return KafkaServerTestHarness$class.securityProtocol(this);
    }

    @Override
    public Option<File> trustStoreFile() {
        return KafkaServerTestHarness$class.trustStoreFile(this);
    }

    @Override
    @Before
    public void setUp() {
        KafkaServerTestHarness$class.setUp(this);
    }

    @Override
    @After
    public void tearDown() {
        KafkaServerTestHarness$class.tearDown(this);
    }

    @Override
    public int killRandomBroker() {
        return KafkaServerTestHarness$class.killRandomBroker(this);
    }

    @Override
    public void restartDeadBrokers() {
        KafkaServerTestHarness$class.restartDeadBrokers(this);
    }

    @Override
    public EmbeddedZookeeper zookeeper() {
        return this.zookeeper;
    }

    @Override
    public void zookeeper_$eq(EmbeddedZookeeper x$1) {
        this.zookeeper = x$1;
    }

    @Override
    public int zkPort() {
        return this.zkPort;
    }

    @Override
    public void zkPort_$eq(int x$1) {
        this.zkPort = x$1;
    }

    @Override
    public ZkUtils zkUtils() {
        return this.zkUtils;
    }

    @Override
    public void zkUtils_$eq(ZkUtils x$1) {
        this.zkUtils = x$1;
    }

    @Override
    public int zkConnectionTimeout() {
        return this.zkConnectionTimeout;
    }

    @Override
    public int zkSessionTimeout() {
        return this.zkSessionTimeout;
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnectionTimeout_$eq(int x$1) {
        this.zkConnectionTimeout = x$1;
    }

    @Override
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkSessionTimeout_$eq(int x$1) {
        this.zkSessionTimeout = x$1;
    }

    @Override
    public String zkConnect() {
        return ZooKeeperTestHarness$class.zkConnect(this);
    }

    @Override
    public String confFile() {
        return ZooKeeperTestHarness$class.confFile(this);
    }

    public String loggerName() {
        return this.loggerName;
    }

    private Logger logger$lzycompute() {
        SyncProducerTest syncProducerTest = this;
        synchronized (syncProducerTest) {
            if (!this.bitmap$0) {
                this.logger = Logging.class.logger((Logging)this);
                this.bitmap$0 = true;
            }
            return this.logger;
        }
    }

    public Logger logger() {
        return this.bitmap$0 ? this.logger : this.logger$lzycompute();
    }

    public String logIdent() {
        return this.logIdent;
    }

    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    public Log4jController$ kafka$utils$Logging$$log4jController() {
        return this.kafka$utils$Logging$$log4jController;
    }

    public void kafka$utils$Logging$_setter_$loggerName_$eq(String x$1) {
        this.loggerName = x$1;
    }

    public void kafka$utils$Logging$_setter_$kafka$utils$Logging$$log4jController_$eq(Log4jController$ x$1) {
        this.kafka$utils$Logging$$log4jController = x$1;
    }

    public void trace(Function0<String> msg) {
        Logging.class.trace((Logging)this, msg);
    }

    public Object trace(Function0<Throwable> e) {
        return Logging.class.trace((Logging)this, e);
    }

    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.trace((Logging)this, msg, e);
    }

    public void swallowTrace(Function0<BoxedUnit> action) {
        Logging.class.swallowTrace((Logging)this, action);
    }

    public void debug(Function0<String> msg) {
        Logging.class.debug((Logging)this, msg);
    }

    public Object debug(Function0<Throwable> e) {
        return Logging.class.debug((Logging)this, e);
    }

    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.debug((Logging)this, msg, e);
    }

    public void swallowDebug(Function0<BoxedUnit> action) {
        Logging.class.swallowDebug((Logging)this, action);
    }

    public void info(Function0<String> msg) {
        Logging.class.info((Logging)this, msg);
    }

    public Object info(Function0<Throwable> e) {
        return Logging.class.info((Logging)this, e);
    }

    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.info((Logging)this, msg, e);
    }

    public void swallowInfo(Function0<BoxedUnit> action) {
        Logging.class.swallowInfo((Logging)this, action);
    }

    public void warn(Function0<String> msg) {
        Logging.class.warn((Logging)this, msg);
    }

    public Object warn(Function0<Throwable> e) {
        return Logging.class.warn((Logging)this, e);
    }

    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.warn((Logging)this, msg, e);
    }

    public void swallowWarn(Function0<BoxedUnit> action) {
        Logging.class.swallowWarn((Logging)this, action);
    }

    public void swallow(Function0<BoxedUnit> action) {
        Logging.class.swallow((Logging)this, action);
    }

    public void error(Function0<String> msg) {
        Logging.class.error((Logging)this, msg);
    }

    public Object error(Function0<Throwable> e) {
        return Logging.class.error((Logging)this, e);
    }

    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.error((Logging)this, msg, e);
    }

    public void swallowError(Function0<BoxedUnit> action) {
        Logging.class.swallowError((Logging)this, action);
    }

    public void fatal(Function0<String> msg) {
        Logging.class.fatal((Logging)this, msg);
    }

    public Object fatal(Function0<Throwable> e) {
        return Logging.class.fatal((Logging)this, e);
    }

    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.class.fatal((Logging)this, msg, e);
    }

    private byte[] messageBytes() {
        return this.messageBytes;
    }

    public List<KafkaConfig> generateConfigs() {
        return List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaConfig[]{KafkaConfig$.MODULE$.fromProps((Properties)TestUtils$.MODULE$.createBrokerConfigs(1, this.zkConnect(), false, TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10()).head())}));
    }

    @Test
    public void testReachableServer() {
        KafkaServer server = (KafkaServer)this.servers().head();
        SocketServer qual$1 = server.socketServer();
        SecurityProtocol x$3 = qual$1.boundPort$default$1();
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(qual$1.boundPort(x$3));
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        long firstStart = SystemTime$.MODULE$.milliseconds();
        try {
            ProducerResponse response = producer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(this.messageBytes())})), 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
            Assert.assertNotNull((Object)response);
        }
        catch (Exception exception) {
            Assert.fail((String)new StringBuilder().append((Object)"Unexpected failure sending message to broker. ").append((Object)exception.getMessage()).toString());
        }
        long firstEnd = SystemTime$.MODULE$.milliseconds();
        Assert.assertTrue((firstEnd - firstStart < 500L ? 1 : 0) != 0);
        long secondStart = SystemTime$.MODULE$.milliseconds();
        try {
            ProducerResponse response = producer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(this.messageBytes())})), 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
            Assert.assertNotNull((Object)response);
        }
        catch (Exception exception) {
            Assert.fail((String)new StringBuilder().append((Object)"Unexpected failure sending message to broker. ").append((Object)exception.getMessage()).toString());
        }
        long secondEnd = SystemTime$.MODULE$.milliseconds();
        Assert.assertTrue((secondEnd - secondStart < 500L ? 1 : 0) != 0);
        try {
            ProducerResponse response = producer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(this.messageBytes())})), 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
            Assert.assertNotNull((Object)response);
        }
        catch (Exception exception) {
            Assert.fail((String)new StringBuilder().append((Object)"Unexpected failure sending message to broker. ").append((Object)exception.getMessage()).toString());
        }
    }

    @Test
    public void testEmptyProduceRequest() {
        int ackTimeoutMs;
        short ack;
        String clientId;
        int correlationId;
        ProducerRequest emptyRequest;
        SecurityProtocol x$4;
        KafkaServer server = (KafkaServer)this.servers().head();
        SocketServer qual$2 = server.socketServer();
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(qual$2.boundPort(x$4 = qual$2.boundPort$default$1()));
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        ProducerResponse response = producer.send(emptyRequest = new ProducerRequest(correlationId = 0, clientId = SyncProducerConfig$.MODULE$.DefaultClientId(), ack = 1, ackTimeoutMs = SyncProducerConfig$.MODULE$.DefaultAckTimeoutMs(), (Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$)));
        Assert.assertTrue((response != null ? 1 : 0) != 0);
        Assert.assertTrue((!response.hasError() && response.status().size() == 0 ? 1 : 0) != 0);
    }

    @Test
    public void testMessageSizeTooLarge() {
        KafkaServer server = (KafkaServer)this.servers().head();
        SocketServer qual$3 = server.socketServer();
        SecurityProtocol x$5 = qual$3.boundPort$default$1();
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(qual$3.boundPort(x$5));
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        TestUtils$.MODULE$.createTopic(this.zkUtils(), "test", 1, 1, (Seq<KafkaServer>)this.servers(), TestUtils$.MODULE$.createTopic$default$6());
        Message message1 = new Message(new byte[Predef$.MODULE$.Integer2int(((KafkaConfig)this.configs().apply(0)).messageMaxBytes()) + 1]);
        ByteBufferMessageSet messageSet1 = new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{message1}));
        ProducerResponse response1 = producer.send(TestUtils$.MODULE$.produceRequest("test", 0, messageSet1, 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
        Assert.assertEquals((long)1L, (long)response1.status().count((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final boolean apply(Tuple2<TopicAndPartition, ProducerResponseStatus> x$1) {
                return ((ProducerResponseStatus)x$1._2()).error() != ErrorMapping$.MODULE$.NoError();
            }
        }));
        Assert.assertEquals((long)ErrorMapping$.MODULE$.MessageSizeTooLargeCode(), (long)((ProducerResponseStatus)response1.status().apply((Object)new TopicAndPartition("test", 0))).error());
        Assert.assertEquals((long)-1L, (long)((ProducerResponseStatus)response1.status().apply((Object)new TopicAndPartition("test", 0))).offset());
        int safeSize = Predef$.MODULE$.Integer2int(((KafkaConfig)this.configs().apply(0)).messageMaxBytes()) - Message$.MODULE$.MessageOverhead() - MessageSet$.MODULE$.LogOverhead() - 1;
        Message message2 = new Message(new byte[safeSize]);
        ByteBufferMessageSet messageSet2 = new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{message2}));
        ProducerResponse response2 = producer.send(TestUtils$.MODULE$.produceRequest("test", 0, messageSet2, 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
        Assert.assertEquals((long)1L, (long)response1.status().count((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final boolean apply(Tuple2<TopicAndPartition, ProducerResponseStatus> x$2) {
                return ((ProducerResponseStatus)x$2._2()).error() != ErrorMapping$.MODULE$.NoError();
            }
        }));
        Assert.assertEquals((long)ErrorMapping$.MODULE$.NoError(), (long)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("test", 0))).error());
        Assert.assertEquals((long)0L, (long)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("test", 0))).offset());
    }

    @Test
    public void testMessageSizeTooLargeWithAckZero() {
        KafkaServer server = (KafkaServer)this.servers().head();
        SocketServer qual$4 = server.socketServer();
        SecurityProtocol x$6 = qual$4.boundPort$default$1();
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(qual$4.boundPort(x$6));
        props.put("request.required.acks", "0");
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        AdminUtils$.MODULE$.createTopic(this.zkUtils(), "test", 1, 1, AdminUtils$.MODULE$.createTopic$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkUtils(), "test", 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        producer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(new byte[Predef$.MODULE$.Integer2int(((KafkaConfig)this.configs().apply(0)).messageMaxBytes()) + 1])})), 0, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
        try {
            producer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(new byte[Predef$.MODULE$.Integer2int(((KafkaConfig)this.configs().apply(0)).messageMaxBytes()) + 1])})), 0, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
        }
        catch (Throwable throwable) {
            throw throwable;
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    @Test
    public void testProduceCorrectlyReceivesResponse() {
        KafkaServer server = (KafkaServer)this.servers().head();
        SocketServer qual$5 = server.socketServer();
        SecurityProtocol x$7 = qual$5.boundPort$default$1();
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(qual$5.boundPort(x$7));
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        ByteBufferMessageSet messages2 = new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(this.messageBytes())}));
        ProducerRequest request = TestUtils$.MODULE$.produceRequestWithAcks((Seq<String>)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"topic1", "topic2", "topic3"}), (Seq<Object>)Predef$.MODULE$.wrapIntArray(new int[]{0}), messages2, 1, TestUtils$.MODULE$.produceRequestWithAcks$default$5(), TestUtils$.MODULE$.produceRequestWithAcks$default$6(), TestUtils$.MODULE$.produceRequestWithAcks$default$7());
        ProducerResponse response = producer.send(request);
        Assert.assertNotNull((Object)response);
        Assert.assertEquals((long)request.correlationId(), (long)response.correlationId());
        Assert.assertEquals((long)3L, (long)response.status().size());
        response.status().values().foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final void apply(ProducerResponseStatus x0$1) {
                ProducerResponseStatus producerResponseStatus = x0$1;
                if (producerResponseStatus != null) {
                    short error = producerResponseStatus.error();
                    long nextOffset = producerResponseStatus.offset();
                    Assert.assertEquals((long)ErrorMapping$.MODULE$.UnknownTopicOrPartitionCode(), (long)error);
                    Assert.assertEquals((long)-1L, (long)nextOffset);
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                }
                throw new MatchError((Object)producerResponseStatus);
            }
        });
        AdminUtils$.MODULE$.createTopic(this.zkUtils(), "topic1", 1, 1, AdminUtils$.MODULE$.createTopic$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkUtils(), "topic1", 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        AdminUtils$.MODULE$.createTopic(this.zkUtils(), "topic3", 1, 1, AdminUtils$.MODULE$.createTopic$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkUtils(), "topic3", 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        ProducerResponse response2 = producer.send(request);
        Assert.assertNotNull((Object)response2);
        Assert.assertEquals((long)request.correlationId(), (long)response2.correlationId());
        Assert.assertEquals((long)3L, (long)response2.status().size());
        Assert.assertEquals((long)ErrorMapping$.MODULE$.NoError(), (long)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("topic1", 0))).error());
        Assert.assertEquals((long)ErrorMapping$.MODULE$.NoError(), (long)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("topic3", 0))).error());
        Assert.assertEquals((long)0L, (long)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("topic1", 0))).offset());
        Assert.assertEquals((long)0L, (long)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("topic3", 0))).offset());
        Assert.assertEquals((long)ErrorMapping$.MODULE$.UnknownTopicOrPartitionCode(), (long)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("topic2", 0))).error());
        Assert.assertEquals((long)-1L, (long)((ProducerResponseStatus)response2.status().apply((Object)new TopicAndPartition("topic2", 0))).offset());
    }

    @Test
    public void testProducerCanTimeout() {
        int timeoutMs = 500;
        KafkaServer server = (KafkaServer)this.servers().head();
        SocketServer qual$6 = server.socketServer();
        SecurityProtocol x$8 = qual$6.boundPort$default$1();
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(qual$6.boundPort(x$8));
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        ByteBufferMessageSet messages2 = new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(this.messageBytes())}));
        ProducerRequest request = TestUtils$.MODULE$.produceRequest("topic1", 0, messages2, 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7());
        server.requestHandlerPool().shutdown();
        long t1 = SystemTime$.MODULE$.milliseconds();
        try {
            producer.send(request);
            Assert.fail((String)"Should have received timeout exception since request handling is stopped.");
        }
        catch (Throwable throwable) {
            Assert.fail((String)new StringBuilder().append((Object)"Unexpected exception when expecting timeout: ").append((Object)throwable).toString());
        }
        catch (SocketTimeoutException socketTimeoutException) {
            // empty catch block
        }
        long t2 = SystemTime$.MODULE$.milliseconds();
        Assert.assertTrue((t2 - t1 >= (long)timeoutMs ? 1 : 0) != 0);
    }

    @Test
    public void testProduceRequestWithNoResponse() {
        int ackTimeoutMs;
        short ack;
        String clientId;
        int correlationId;
        ProducerRequest emptyRequest;
        KafkaServer server = (KafkaServer)this.servers().head();
        int port = server.socketServer().boundPort(SecurityProtocol.PLAINTEXT);
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(port);
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        ProducerResponse response = producer.send(emptyRequest = new ProducerRequest(correlationId = 0, clientId = SyncProducerConfig$.MODULE$.DefaultClientId(), ack = 0, ackTimeoutMs = SyncProducerConfig$.MODULE$.DefaultAckTimeoutMs(), (Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$)));
        Assert.assertTrue((response == null ? 1 : 0) != 0);
    }

    @Test
    public void testNotEnoughReplicas() {
        String topicName = "minisrtest";
        KafkaServer server = (KafkaServer)this.servers().head();
        SocketServer qual$7 = server.socketServer();
        SecurityProtocol x$9 = qual$7.boundPort$default$1();
        Properties props = TestUtils$.MODULE$.getSyncProducerConfig(qual$7.boundPort(x$9));
        props.put("request.required.acks", "-1");
        SyncProducer producer = new SyncProducer(new SyncProducerConfig(props));
        Properties topicProps = new Properties();
        topicProps.put("min.insync.replicas", "2");
        AdminUtils$.MODULE$.createTopic(this.zkUtils(), topicName, 1, 1, topicProps);
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkUtils(), topicName, 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        ProducerResponse response = producer.send(TestUtils$.MODULE$.produceRequest(topicName, 0, new ByteBufferMessageSet((CompressionCodec)NoCompressionCodec$.MODULE$, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Message[]{new Message(this.messageBytes())})), -1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
        Assert.assertEquals((long)ErrorMapping$.MODULE$.NotEnoughReplicasCode(), (long)((ProducerResponseStatus)response.status().apply((Object)new TopicAndPartition(topicName, 0))).error());
    }

    public SyncProducerTest() {
        Logging.class.$init$((Logging)this);
        ZooKeeperTestHarness$class.$init$(this);
        KafkaServerTestHarness$class.$init$(this);
        this.messageBytes = new byte[2];
    }
}

