/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.ExtendedSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockPartitioner;
import org.apache.kafka.test.MockProducerInterceptor;
import org.apache.kafka.test.MockSerializer;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class KafkaProducerTest {
    private String topic = "topic";
    private Collection<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000));
    private final Cluster emptyCluster = new Cluster(null, this.nodes, Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
    private final Cluster onePartitionCluster = new Cluster("dummy", Collections.singletonList(new Node(0, "host1", 1000)), Collections.singletonList(new PartitionInfo(this.topic, 0, null, null, null)), Collections.emptySet(), Collections.emptySet());
    private final Cluster threePartitionCluster = new Cluster("dummy", Collections.singletonList(new Node(0, "host1", 1000)), Arrays.asList(new PartitionInfo(this.topic, 0, null, null, null), new PartitionInfo(this.topic, 1, null, null, null), new PartitionInfo(this.topic, 2, null, null, null)), Collections.emptySet(), Collections.emptySet());

    @Test
    public void testMetricsReporterAutoGeneratedClientId() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9999");
        props.setProperty("metric.reporters", MockMetricsReporter.class.getName());
        KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        MockMetricsReporter mockMetricsReporter = (MockMetricsReporter)producer.metrics.reporters().get(0);
        Assert.assertEquals((Object)producer.getClientId(), (Object)mockMetricsReporter.clientId);
        producer.close();
    }

    @Test
    public void testConstructorWithSerializers() {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9000");
        new KafkaProducer(producerProps, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer()).close();
    }

    @Test(expected=ConfigException.class)
    public void testNoSerializerProvided() {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9000");
        new KafkaProducer(producerProps);
    }

    @Test
    public void testConstructorFailureCloseResource() {
        Properties props = new Properties();
        props.setProperty("client.id", "testConstructorClose");
        props.setProperty("bootstrap.servers", "some.invalid.hostname.foo.bar.local:9999");
        props.setProperty("metric.reporters", MockMetricsReporter.class.getName());
        int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
        int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
        try (KafkaProducer ignored = new KafkaProducer(props, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());){
            Assert.fail((String)"should have caught an exception and returned");
        }
        catch (KafkaException e) {
            Assert.assertEquals((long)(oldInitCount + 1), (long)MockMetricsReporter.INIT_COUNT.get());
            Assert.assertEquals((long)(oldCloseCount + 1), (long)MockMetricsReporter.CLOSE_COUNT.get());
            Assert.assertEquals((Object)"Failed to construct kafka producer", (Object)e.getMessage());
        }
    }

    @Test
    public void testConstructorWithNotStringKey() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9999");
        props.put((Object)1, "not string key");
        try (KafkaProducer ff = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());){
            Assert.fail((String)"Constructor should throw exception");
        }
        catch (ConfigException e) {
            Assert.assertTrue((String)("Unexpected exception message: " + e.getMessage()), (boolean)e.getMessage().contains("not string key"));
        }
    }

    @Test
    public void testSerializerClose() {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("client.id", "testConstructorClose");
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("metric.reporters", MockMetricsReporter.class.getName());
        configs.put("security.protocol", "PLAINTEXT");
        int oldInitCount = MockSerializer.INIT_COUNT.get();
        int oldCloseCount = MockSerializer.CLOSE_COUNT.get();
        KafkaProducer producer = new KafkaProducer(configs, (Serializer)new MockSerializer(), (Serializer)new MockSerializer());
        Assert.assertEquals((long)(oldInitCount + 2), (long)MockSerializer.INIT_COUNT.get());
        Assert.assertEquals((long)oldCloseCount, (long)MockSerializer.CLOSE_COUNT.get());
        producer.close();
        Assert.assertEquals((long)(oldInitCount + 2), (long)MockSerializer.INIT_COUNT.get());
        Assert.assertEquals((long)(oldCloseCount + 2), (long)MockSerializer.CLOSE_COUNT.get());
    }

    @Test
    public void testInterceptorConstructClose() {
        try {
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "localhost:9999");
            props.setProperty("interceptor.classes", MockProducerInterceptor.class.getName());
            props.setProperty("mock.interceptor.append", "something");
            KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            Assert.assertEquals((long)1L, (long)MockProducerInterceptor.INIT_COUNT.get());
            Assert.assertEquals((long)0L, (long)MockProducerInterceptor.CLOSE_COUNT.get());
            Assert.assertNull((Object)MockProducerInterceptor.CLUSTER_META.get());
            producer.close();
            Assert.assertEquals((long)1L, (long)MockProducerInterceptor.INIT_COUNT.get());
            Assert.assertEquals((long)1L, (long)MockProducerInterceptor.CLOSE_COUNT.get());
        }
        finally {
            MockProducerInterceptor.resetCounters();
        }
    }

    @Test
    public void testPartitionerClose() {
        try {
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "localhost:9999");
            MockPartitioner.resetCounters();
            props.setProperty("partitioner.class", MockPartitioner.class.getName());
            KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            Assert.assertEquals((long)1L, (long)MockPartitioner.INIT_COUNT.get());
            Assert.assertEquals((long)0L, (long)MockPartitioner.CLOSE_COUNT.get());
            producer.close();
            Assert.assertEquals((long)1L, (long)MockPartitioner.INIT_COUNT.get());
            Assert.assertEquals((long)1L, (long)MockPartitioner.CLOSE_COUNT.get());
        }
        finally {
            MockPartitioner.resetCounters();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldCloseProperlyAndThrowIfInterrupted() throws Exception {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("partitioner.class", MockPartitioner.class.getName());
        configs.put("batch.size", "1");
        MockTime time = new MockTime();
        Cluster cluster = TestUtils.singletonCluster("topic", 1);
        Node node = (Node)cluster.nodes().get(0);
        Metadata metadata = new Metadata(0L, Long.MAX_VALUE, true);
        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
        MockClient client = new MockClient(time, metadata);
        client.setNode(node);
        KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)client, null, (Time)time);
        ExecutorService executor = Executors.newSingleThreadExecutor();
        AtomicReference closeException = new AtomicReference();
        try {
            Future<?> future = executor.submit(() -> KafkaProducerTest.lambda$shouldCloseProperlyAndThrowIfInterrupted$0((Producer)producer, closeException));
            try {
                future.get(100L, TimeUnit.MILLISECONDS);
                Assert.fail((String)"Close completed without waiting for send");
            }
            catch (TimeoutException timeoutException) {
                // empty catch block
            }
            client.waitForRequests(1, 1000L);
            Assert.assertTrue((String)"Close terminated prematurely", (boolean)future.cancel(true));
            TestUtils.waitForCondition(() -> closeException.get() != null, "InterruptException did not occur within timeout.");
            Assert.assertTrue((String)("Expected exception not thrown " + closeException), (boolean)(closeException.get() instanceof InterruptException));
        }
        finally {
            executor.shutdownNow();
        }
    }

    @Test
    public void testOsDefaultSocketBufferSizes() {
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("bootstrap.servers", "localhost:9999");
        config.put("send.buffer.bytes", -1);
        config.put("receive.buffer.bytes", -1);
        new KafkaProducer(config, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer()).close();
    }

    @Test(expected=KafkaException.class)
    public void testInvalidSocketSendBufferSize() {
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("bootstrap.servers", "localhost:9999");
        config.put("send.buffer.bytes", -2);
        new KafkaProducer(config, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
    }

    @Test(expected=KafkaException.class)
    public void testInvalidSocketReceiveBufferSize() {
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("bootstrap.servers", "localhost:9999");
        config.put("receive.buffer.bytes", -2);
        new KafkaProducer(config, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
    }

    @Test
    public void testMetadataFetch() throws InterruptedException {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("bootstrap.servers", "localhost:9999");
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        Mockito.when((Object)metadata.fetch()).thenReturn((Object)this.emptyCluster, (Object[])new Cluster[]{this.emptyCluster, this.emptyCluster, this.emptyCluster, this.onePartitionCluster});
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)new MockClient(Time.SYSTEM, metadata), null, Time.SYSTEM){

            Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metadata) {
                return super.newSender(logContext, kafkaClient, new Metadata(0L, 100000L, true));
            }
        };
        ProducerRecord record = new ProducerRecord(this.topic, (Object)"value");
        producer.send(record);
        ((Metadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).requestUpdate();
        ((Metadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((Metadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)5))).fetch();
        producer.send(record, null);
        ((Metadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).requestUpdate();
        ((Metadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((Metadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)6))).fetch();
        producer.partitionsFor(this.topic);
        ((Metadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).requestUpdate();
        ((Metadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((Metadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)7))).fetch();
        producer.close(0L, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMetadataTimeoutWithMissingTopic() throws Exception {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("max.block.ms", 60000);
        ProducerRecord record = new ProducerRecord(this.topic, Integer.valueOf(2), null, (Object)"value");
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockTime mockTime = new MockTime();
        AtomicInteger invocationCount = new AtomicInteger(0);
        Mockito.when((Object)metadata.fetch()).then(invocation -> {
            invocationCount.incrementAndGet();
            if (invocationCount.get() == 5) {
                mockTime.setCurrentTimeMs(mockTime.milliseconds() + 70000L);
            }
            return this.emptyCluster;
        });
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)new MockClient(Time.SYSTEM, metadata), null, (Time)mockTime){

            Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metadata) {
                return super.newSender(logContext, kafkaClient, new Metadata(0L, 100000L, true));
            }
        };
        Future future = producer.send(record);
        ((Metadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).requestUpdate();
        ((Metadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((Metadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)5))).fetch();
        try {
            future.get();
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof org.apache.kafka.common.errors.TimeoutException));
        }
        finally {
            producer.close(0L, TimeUnit.MILLISECONDS);
        }
    }

    @Test
    public void testMetadataWithPartitionOutOfRange() throws Exception {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("max.block.ms", 60000);
        ProducerRecord record = new ProducerRecord(this.topic, Integer.valueOf(2), null, (Object)"value");
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockTime mockTime = new MockTime();
        Mockito.when((Object)metadata.fetch()).thenReturn((Object)this.onePartitionCluster, (Object[])new Cluster[]{this.onePartitionCluster, this.threePartitionCluster});
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)new MockClient(Time.SYSTEM, metadata), null, (Time)mockTime){

            Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metadata) {
                return super.newSender(logContext, kafkaClient, new Metadata(0L, 100000L, true));
            }
        };
        producer.send(record);
        ((Metadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)2))).requestUpdate();
        ((Metadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)2))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((Metadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)3))).fetch();
        producer.close(0L, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMetadataTimeoutWithPartitionOutOfRange() throws Exception {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("max.block.ms", 60000);
        ProducerRecord record = new ProducerRecord(this.topic, Integer.valueOf(2), null, (Object)"value");
        Metadata metadata = (Metadata)Mockito.mock(Metadata.class);
        MockTime mockTime = new MockTime();
        AtomicInteger invocationCount = new AtomicInteger(0);
        Mockito.when((Object)metadata.fetch()).then(invocation -> {
            invocationCount.incrementAndGet();
            if (invocationCount.get() == 5) {
                mockTime.setCurrentTimeMs(mockTime.milliseconds() + 70000L);
            }
            return this.onePartitionCluster;
        });
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)new MockClient(Time.SYSTEM, metadata), null, (Time)mockTime){

            Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metadata) {
                return super.newSender(logContext, kafkaClient, new Metadata(0L, 100000L, true));
            }
        };
        Future future = producer.send(record);
        ((Metadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).requestUpdate();
        ((Metadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)4))).awaitUpdate(ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((Metadata)Mockito.verify((Object)metadata, (VerificationMode)Mockito.times((int)5))).fetch();
        try {
            future.get();
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof org.apache.kafka.common.errors.TimeoutException));
        }
        finally {
            producer.close(0L, TimeUnit.MILLISECONDS);
        }
    }

    @Test
    public void testTopicRefreshInMetadata() throws InterruptedException {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("max.block.ms", "600000");
        long refreshBackoffMs = 500L;
        long metadataExpireMs = 60000L;
        Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, true, new ClusterResourceListeners());
        MockTime time = new MockTime();
        String topic = "topic";
        try (KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, null, null, (Time)time);){
            Thread t = new Thread(() -> {
                long startTimeMs = System.currentTimeMillis();
                for (int i = 0; i < 10; ++i) {
                    while (!metadata.updateRequested() && System.currentTimeMillis() - startTimeMs < 1000L) {
                        Thread.yield();
                    }
                    metadata.update(Cluster.empty(), Collections.singleton("topic"), time.milliseconds());
                    time.sleep(60000L);
                }
            });
            t.start();
            try {
                producer.partitionsFor("topic");
                Assert.fail((String)"Expect TimeoutException");
            }
            catch (org.apache.kafka.common.errors.TimeoutException timeoutException) {
                // empty catch block
            }
            t.join();
        }
        Assert.assertTrue((String)"Topic should still exist in metadata", (boolean)metadata.containsTopic("topic"));
    }

    @Test
    @Deprecated
    public void testHeadersWithExtendedClasses() {
        this.doTestHeaders(ExtendedSerializer.class);
    }

    @Test
    public void testHeaders() {
        this.doTestHeaders(Serializer.class);
    }

    private <T extends Serializer<String>> void doTestHeaders(Class<T> serializerClassToMock) {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("bootstrap.servers", "localhost:9999");
        Serializer keySerializer = (Serializer)Mockito.mock(serializerClassToMock);
        Serializer valueSerializer = (Serializer)Mockito.mock(serializerClassToMock);
        String topic = "topic";
        Cluster cluster = new Cluster("dummy", Collections.singletonList(new Node(0, "host1", 1000)), Collections.singletonList(new PartitionInfo(topic, 0, null, new Node[0], new Node[0])), Collections.emptySet(), Collections.emptySet());
        Metadata metadata = new Metadata(0L, 90000L, true);
        metadata.update(cluster, Collections.emptySet(), Time.SYSTEM.milliseconds());
        KafkaProducer producer = new KafkaProducer(configs, keySerializer, valueSerializer, metadata, null, null, Time.SYSTEM);
        Mockito.when((Object)keySerializer.serialize((String)ArgumentMatchers.any(), (Headers)ArgumentMatchers.any(), ArgumentMatchers.any())).then(invocation -> ((String)invocation.getArgument(2)).getBytes());
        Mockito.when((Object)valueSerializer.serialize((String)ArgumentMatchers.any(), (Headers)ArgumentMatchers.any(), ArgumentMatchers.any())).then(invocation -> ((String)invocation.getArgument(2)).getBytes());
        String value = "value";
        String key = "key";
        ProducerRecord record = new ProducerRecord(topic, (Object)key, (Object)value);
        record.headers().add((Header)new RecordHeader("test", "header2".getBytes()));
        producer.send(record, null);
        try {
            record.headers().add((Header)new RecordHeader("test", "test".getBytes()));
            Assert.fail((String)"Expected IllegalStateException to be raised");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        Assert.assertArrayEquals((byte[])record.headers().lastHeader("test").value(), (byte[])"header2".getBytes());
        ((Serializer)Mockito.verify((Object)valueSerializer)).serialize(topic, record.headers(), (Object)value);
        ((Serializer)Mockito.verify((Object)keySerializer)).serialize(topic, record.headers(), (Object)key);
        producer.close(0L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void closeShouldBeIdempotent() {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9000");
        KafkaProducer producer = new KafkaProducer(producerProps, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
        producer.close();
        producer.close();
    }

    @Test
    public void testMetricConfigRecordingLevel() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9000");
        try (KafkaProducer producer = new KafkaProducer(props, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());){
            Assert.assertEquals((Object)Sensor.RecordingLevel.INFO, (Object)producer.metrics.config().recordLevel());
        }
        props.put("metrics.recording.level", "DEBUG");
        producer = new KafkaProducer(props, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
        var3_3 = null;
        try {
            Assert.assertEquals((Object)Sensor.RecordingLevel.DEBUG, (Object)producer.metrics.config().recordLevel());
        }
        catch (Throwable throwable) {
            var3_3 = throwable;
            throw throwable;
        }
        finally {
            if (producer != null) {
                if (var3_3 != null) {
                    try {
                        producer.close();
                    }
                    catch (Throwable throwable) {
                        var3_3.addSuppressed(throwable);
                    }
                } else {
                    producer.close();
                }
            }
        }
    }

    @Test
    public void testInterceptorPartitionSetOnTooLargeRecord() {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("max.request.size", "1");
        String topic = "topic";
        ProducerRecord record = new ProducerRecord(topic, (Object)"value");
        Metadata metadata = new Metadata(0L, 90000L, true);
        Cluster cluster = new Cluster("dummy", Collections.singletonList(new Node(0, "host1", 1000)), Collections.singletonList(new PartitionInfo(topic, 0, null, new Node[0], new Node[0])), Collections.emptySet(), Collections.emptySet());
        metadata.update(cluster, Collections.emptySet(), Time.SYSTEM.milliseconds());
        ProducerInterceptors interceptors = (ProducerInterceptors)Mockito.mock(ProducerInterceptors.class);
        KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, null, interceptors, Time.SYSTEM);
        Mockito.when((Object)interceptors.onSend((ProducerRecord)ArgumentMatchers.any())).then(invocation -> invocation.getArgument(0));
        producer.send(record);
        ((ProducerInterceptors)Mockito.verify((Object)interceptors)).onSend(record);
        ((ProducerInterceptors)Mockito.verify((Object)interceptors)).onSendError((ProducerRecord)ArgumentMatchers.eq((Object)record), (TopicPartition)ArgumentMatchers.notNull(), (Exception)ArgumentMatchers.notNull());
        producer.close(0L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testPartitionsForWithNullTopic() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9000");
        try (KafkaProducer producer = new KafkaProducer(props, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());){
            producer.partitionsFor(null);
            Assert.fail((String)"Expected NullPointerException to be raised");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    @Test(expected=org.apache.kafka.common.errors.TimeoutException.class)
    public void testInitTransactionTimeout() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("transactional.id", "bad-transaction");
        configs.put("max.block.ms", 5);
        configs.put("bootstrap.servers", "localhost:9000");
        Time time = Time.SYSTEM;
        Cluster cluster = TestUtils.singletonCluster("topic", 1);
        Node node = (Node)cluster.nodes().get(0);
        Metadata metadata = new Metadata(0L, Long.MAX_VALUE, true);
        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
        MockClient client = new MockClient(time, metadata);
        client.setNode(node);
        try (KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)client, null, time);){
            producer.initTransactions();
            Assert.fail((String)"initTransactions() should have raised TimeoutException");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(expected=KafkaException.class)
    public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("transactional.id", "bad-transaction");
        configs.put("max.block.ms", 5);
        configs.put("bootstrap.servers", "localhost:9000");
        MockTime time = new MockTime();
        Cluster cluster = TestUtils.singletonCluster("topic", 1);
        Node node = (Node)cluster.nodes().get(0);
        Metadata metadata = new Metadata(0L, Long.MAX_VALUE, true);
        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
        MockClient client = new MockClient(time, metadata);
        client.setNode(node);
        KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)client, null, (Time)time);
        try {
            producer.initTransactions();
        }
        catch (org.apache.kafka.common.errors.TimeoutException timeoutException) {
            // empty catch block
        }
        try {
            producer.beginTransaction();
        }
        finally {
            producer.close(0L, TimeUnit.MILLISECONDS);
        }
    }

    @Test
    public void testSendToInvalidTopic() throws Exception {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("bootstrap.servers", "localhost:9000");
        configs.put("max.block.ms", "15000");
        MockTime time = new MockTime();
        Cluster cluster = TestUtils.singletonCluster();
        Node node = (Node)cluster.nodes().get(0);
        Metadata metadata = new Metadata(0L, Long.MAX_VALUE, true);
        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
        MockClient client = new MockClient(time, metadata);
        client.setNode(node);
        KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)client, null, (Time)time);
        String invalidTopicName = "topic abc";
        ProducerRecord record = new ProducerRecord(invalidTopicName, (Object)"HelloKafka");
        HashSet<String> invalidTopic = new HashSet<String>();
        invalidTopic.add(invalidTopicName);
        Cluster metaDataUpdateResponseCluster = new Cluster(cluster.clusterResource().clusterId(), (Collection)cluster.nodes(), new ArrayList(0), Collections.emptySet(), invalidTopic, cluster.internalTopics(), cluster.controller());
        client.prepareMetadataUpdate(metaDataUpdateResponseCluster, Collections.emptySet());
        Future future = producer.send(record);
        Assert.assertEquals((String)"Cluster has incorrect invalid topic list", (Object)metaDataUpdateResponseCluster.invalidTopics(), (Object)metadata.fetch().invalidTopics());
        TestUtils.assertFutureError(future, InvalidTopicException.class);
        producer.close(0L, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCloseWhenWaitingForMetadataUpdate() throws InterruptedException {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("max.block.ms", Long.MAX_VALUE);
        configs.put("bootstrap.servers", "localhost:9000");
        String topicName = "test";
        MockTime time = new MockTime();
        Cluster cluster = TestUtils.singletonCluster();
        Node node = (Node)cluster.nodes().get(0);
        Metadata metadata = new Metadata(0L, Long.MAX_VALUE, false);
        metadata.update(cluster, Collections.emptySet(), time.milliseconds());
        MockClient client = new MockClient(time, metadata);
        client.setNode(node);
        KafkaProducer producer = new KafkaProducer(configs, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), metadata, (KafkaClient)client, null, (Time)time);
        ExecutorService executor = Executors.newSingleThreadExecutor();
        AtomicReference sendException = new AtomicReference();
        try {
            executor.submit(() -> KafkaProducerTest.lambda$testCloseWhenWaitingForMetadataUpdate$8((Producer)producer, topicName, sendException));
            TestUtils.waitForCondition(() -> metadata.containsTopic(topicName), "Timeout when waiting for topic to be added to metadata");
            producer.close(0L, TimeUnit.MILLISECONDS);
            TestUtils.waitForCondition(() -> sendException.get() != null, "No producer exception within timeout");
            Assert.assertEquals(KafkaException.class, ((Exception)sendException.get()).getClass());
        }
        finally {
            executor.shutdownNow();
        }
    }

    private static /* synthetic */ void lambda$testCloseWhenWaitingForMetadataUpdate$8(Producer producer, String topicName, AtomicReference sendException) {
        try {
            producer.send(new ProducerRecord(topicName, (Object)"key", (Object)"value"));
            Assert.fail();
        }
        catch (Exception e) {
            sendException.set(e);
        }
    }

    private static /* synthetic */ void lambda$shouldCloseProperlyAndThrowIfInterrupted$0(Producer producer, AtomicReference closeException) {
        producer.send(new ProducerRecord("topic", (Object)"key", (Object)"value"));
        try {
            producer.close();
            Assert.fail((String)"Close should block and throw.");
        }
        catch (Exception e) {
            closeException.set(e);
        }
    }
}

