/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.InputStreamReader;
import java.util.Properties;
import java.util.Random;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.consumer.ZookeeperConsumerConnector;
import kafka.log.FileMessageSet;
import kafka.log.Log$;
import kafka.message.MessageAndMetadata;
import kafka.message.MessageAndOffset;
import kafka.serializer.Decoder;
import kafka.serializer.StringDecoder;
import kafka.serializer.StringDecoder$;
import kafka.tools.TestLogCleaning$;
import kafka.tools.TestRecord;
import kafka.utils.CommandLineUtils$;
import kafka.utils.IteratorTemplate;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.io.Codec$;
import scala.io.Source$;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong;

public final class TestLogCleaning$ {
    public static final TestLogCleaning$ MODULE$;

    static {
        new TestLogCleaning$();
    }

    public void main(String[] args) {
        OptionParser parser = new OptionParser();
        ArgumentAcceptingOptionSpec numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume.").withRequiredArg().describedAs("count").ofType(Long.class).defaultsTo((Object)Predef$.MODULE$.long2Long(Long.MAX_VALUE), (Object[])new Long[0]);
        ArgumentAcceptingOptionSpec messageCompressionOpt = parser.accepts("compression-type", "message compression type").withOptionalArg().describedAs("compressionType").ofType(String.class).defaultsTo((Object)"none", (Object[])new String[0]);
        ArgumentAcceptingOptionSpec numDupsOpt = parser.accepts("duplicates", "The number of duplicates for each key.").withRequiredArg().describedAs("count").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(5), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec brokerOpt = parser.accepts("broker", "Url to connect to.").withRequiredArg().describedAs("url").ofType(String.class);
        ArgumentAcceptingOptionSpec topicsOpt = parser.accepts("topics", "The number of topics to test.").withRequiredArg().describedAs("count").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(1), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec percentDeletesOpt = parser.accepts("percent-deletes", "The percentage of updates that are deletes.").withRequiredArg().describedAs("percent").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(0), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec zkConnectOpt = parser.accepts("zk", "Zk url.").withRequiredArg().describedAs("url").ofType(String.class);
        ArgumentAcceptingOptionSpec sleepSecsOpt = parser.accepts("sleep", "Time to sleep between production and consumption.").withRequiredArg().describedAs("ms").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(0), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec dumpOpt = parser.accepts("dump", "Dump the message contents of a topic partition that contains test data from this test to standard out.").withRequiredArg().describedAs("directory").ofType(String.class);
        OptionSet options = parser.parse(args);
        if (args.length == 0) {
            throw CommandLineUtils$.MODULE$.printUsageAndDie(parser, "An integration test for log cleaning.");
        }
        if (options.has((OptionSpec)dumpOpt)) {
            this.dumpLog(new File((String)options.valueOf((OptionSpec)dumpOpt)));
            System.exit(0);
        }
        CommandLineUtils$.MODULE$.checkRequiredArgs(parser, options, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new OptionSpec[]{brokerOpt, zkConnectOpt, numMessagesOpt}));
        long messages2 = (Long)options.valueOf((OptionSpec)numMessagesOpt);
        String compressionType = (String)options.valueOf((OptionSpec)messageCompressionOpt);
        int percentDeletes = (Integer)options.valueOf((OptionSpec)percentDeletesOpt);
        int dups = (Integer)options.valueOf((OptionSpec)numDupsOpt);
        String brokerUrl = (String)options.valueOf((OptionSpec)brokerOpt);
        int topicCount = (Integer)options.valueOf((OptionSpec)topicsOpt);
        String zkUrl = (String)options.valueOf((OptionSpec)zkConnectOpt);
        int sleepSecs = (Integer)options.valueOf((OptionSpec)sleepSecsOpt);
        int testId = new Random().nextInt(Integer.MAX_VALUE);
        String[] topics = (String[])((TraversableOnce)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), topicCount).map((Function1)new Serializable(testId){
            public static final long serialVersionUID = 0L;
            private final int testId$1;

            public final String apply(int x$1) {
                return new StringBuilder().append((Object)"log-cleaner-test-").append((Object)BoxesRunTime.boxToInteger((int)this.testId$1)).append((Object)"-").append((Object)BoxesRunTime.boxToInteger((int)x$1)).toString();
            }
            {
                this.testId$1 = testId$1;
            }
        }, IndexedSeq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(String.class));
        Predef$.MODULE$.println((Object)new StringOps(Predef$.MODULE$.augmentString("Producing %d messages...")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)messages2)})));
        File producedDataFile = this.produceMessages(brokerUrl, topics, messages2, compressionType, dups, percentDeletes);
        Predef$.MODULE$.println((Object)new StringOps(Predef$.MODULE$.augmentString("Sleeping for %d seconds...")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)sleepSecs)})));
        Thread.sleep(sleepSecs * 1000);
        Predef$.MODULE$.println((Object)"Consuming messages...");
        File consumedDataFile = this.consumeMessages(zkUrl, topics);
        int producedLines = this.lineCount(producedDataFile);
        int consumedLines = this.lineCount(consumedDataFile);
        double reduction = 1.0 - (double)consumedLines / (double)producedLines;
        Predef$.MODULE$.println((Object)new StringOps(Predef$.MODULE$.augmentString("%d rows of data produced, %d rows of data consumed (%.1f%% reduction).")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)producedLines), BoxesRunTime.boxToInteger((int)consumedLines), BoxesRunTime.boxToDouble((double)((double)100 * reduction))})));
        Predef$.MODULE$.println((Object)"De-duplicating and validating output files...");
        this.validateOutput(producedDataFile, consumedDataFile);
        producedDataFile.delete();
        consumedDataFile.delete();
    }

    public void dumpLog(File dir) {
        Predef$.MODULE$.require(dir.exists(), (Function0)new Serializable(dir){
            public static final long serialVersionUID = 0L;
            private final File dir$1;

            public final String apply() {
                return new StringBuilder().append((Object)"Non-existent directory: ").append((Object)this.dir$1.getAbsolutePath()).toString();
            }
            {
                this.dir$1 = dir$1;
            }
        });
        Predef$.MODULE$.refArrayOps((Object[])Predef$.MODULE$.refArrayOps((Object[])dir.list()).sorted((Ordering)Ordering.String$.MODULE$)).withFilter((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final boolean apply(String file) {
                return file.endsWith(Log$.MODULE$.LogFileSuffix());
            }
        }).foreach((Function1)new Serializable(dir){
            public static final long serialVersionUID = 0L;
            private final File dir$1;

            public final void apply(String file) {
                FileMessageSet ms = new FileMessageSet(new File(this.dir$1, file));
                ms.foreach((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;

                    public final void apply(MessageAndOffset entry) {
                        String key = TestUtils$.MODULE$.readString(entry.message().key(), TestUtils$.MODULE$.readString$default$2());
                        String content = entry.message().isNull() ? null : TestUtils$.MODULE$.readString(entry.message().payload(), TestUtils$.MODULE$.readString$default$2());
                        Predef$.MODULE$.println((Object)new StringOps(Predef$.MODULE$.augmentString("offset = %s, key = %s, content = %s")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)entry.offset()), key, content})));
                    }
                });
            }
            {
                this.dir$1 = dir$1;
            }
        });
    }

    public int lineCount(File file) {
        return Source$.MODULE$.fromFile(file, Codec$.MODULE$.fallbackSystemCodec()).getLines().size();
    }

    public void validateOutput(File producedDataFile, File consumedDataFile) {
        BufferedReader producedReader = this.externalSort(producedDataFile);
        BufferedReader consumedReader = this.externalSort(consumedDataFile);
        IteratorTemplate<TestRecord> produced = this.valuesIterator(producedReader);
        IteratorTemplate<TestRecord> consumed = this.valuesIterator(consumedReader);
        File producedDedupedFile = new File(new StringBuilder().append((Object)producedDataFile.getAbsolutePath()).append((Object)".deduped").toString());
        BufferedWriter producedDeduped = new BufferedWriter(new FileWriter(producedDedupedFile), 0x100000);
        File consumedDedupedFile = new File(new StringBuilder().append((Object)consumedDataFile.getAbsolutePath()).append((Object)".deduped").toString());
        BufferedWriter consumedDeduped = new BufferedWriter(new FileWriter(consumedDedupedFile), 0x100000);
        int total = 0;
        int mismatched = 0;
        while (produced.hasNext() && consumed.hasNext()) {
            TestRecord p = (TestRecord)produced.next();
            producedDeduped.write(p.toString());
            producedDeduped.newLine();
            TestRecord c = (TestRecord)consumed.next();
            consumedDeduped.write(c.toString());
            consumedDeduped.newLine();
            TestRecord testRecord = p;
            TestRecord testRecord2 = c;
            if (testRecord == null ? testRecord2 != null : !((Object)testRecord).equals(testRecord2)) {
                ++mismatched;
            }
            ++total;
        }
        producedDeduped.close();
        consumedDeduped.close();
        Predef$.MODULE$.println((Object)new StringBuilder().append((Object)"Validated ").append((Object)BoxesRunTime.boxToInteger((int)total)).append((Object)" values, ").append((Object)BoxesRunTime.boxToInteger((int)mismatched)).append((Object)" mismatches.").toString());
        Predef$.MODULE$.require(!produced.hasNext(), (Function0)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Additional values produced not found in consumer log.";
            }
        });
        Predef$.MODULE$.require(!consumed.hasNext(), (Function0)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Additional values consumed not found in producer log.";
            }
        });
        Predef$.MODULE$.require(mismatched == 0, (Function0)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Non-zero number of row mismatches.";
            }
        });
        producedDedupedFile.delete();
        consumedDedupedFile.delete();
    }

    public IteratorTemplate<TestRecord> valuesIterator(BufferedReader reader) {
        return new IteratorTemplate<TestRecord>(reader){
            private final BufferedReader reader$1;

            public TestRecord makeNext() {
                TestRecord next = TestLogCleaning$.MODULE$.readNext(this.reader$1);
                while (next != null && next.delete()) {
                    next = TestLogCleaning$.MODULE$.readNext(this.reader$1);
                }
                return next == null ? (TestRecord)this.allDone() : next;
            }
            {
                this.reader$1 = reader$1;
            }
        };
    }

    public TestRecord readNext(BufferedReader reader) {
        String line = reader.readLine();
        if (line == null) {
            return null;
        }
        TestRecord curr = new TestRecord(line);
        while (true) {
            if ((line = this.peekLine(reader)) == null) {
                return curr;
            }
            TestRecord next = new TestRecord(line);
            if (next == null) break;
            String string = next.topicAndKey();
            String string2 = curr.topicAndKey();
            if (string != null ? !string.equals(string2) : string2 != null) break;
            curr = next;
            reader.readLine();
        }
        return curr;
    }

    /*
     * WARNING - void declaration
     */
    public String peekLine(BufferedReader reader) {
        void var2_2;
        reader.mark(4096);
        String line = reader.readLine();
        reader.reset();
        return var2_2;
    }

    public BufferedReader externalSort(File file) {
        ProcessBuilder builder = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", new StringBuilder().append((Object)"--temporary-directory=").append((Object)System.getProperty("java.io.tmpdir")).toString(), file.getAbsolutePath());
        Process process = builder.start();
        new Thread(process){
            private final Process process$1;

            public void run() {
                int exitCode = this.process$1.waitFor();
                if (exitCode != 0) {
                    System.err.println("Process exited abnormally.");
                    while (this.process$1.getErrorStream().available() > 0) {
                        System.err.write(this.process$1.getErrorStream().read());
                    }
                }
            }
            {
                this.process$1 = process$1;
            }
        }.start();
        return new BufferedReader(new InputStreamReader(process.getInputStream()), 0xA00000);
    }

    public File produceMessages(String brokerUrl, String[] topics, long messages2, String compressionType, int dups, int percentDeletes) {
        Properties producerProps = new Properties();
        producerProps.setProperty("block.on.buffer.full", "true");
        producerProps.setProperty("bootstrap.servers", brokerUrl);
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        producerProps.setProperty("compression.type", compressionType);
        KafkaProducer producer = new KafkaProducer(producerProps);
        Random rand = new Random(1L);
        int keyCount = (int)(messages2 / (long)dups);
        File producedFile = File.createTempFile("kafka-log-cleaner-produced-", ".txt");
        Predef$.MODULE$.println((Object)new StringBuilder().append((Object)"Logging produce requests to ").append((Object)producedFile.getAbsolutePath()).toString());
        BufferedWriter producedWriter = new BufferedWriter(new FileWriter(producedFile), 0x100000);
        new RichLong(Predef$.MODULE$.longWrapper(0L)).until((Object)BoxesRunTime.boxToLong((long)(messages2 * (long)topics.length))).foreach((Function1)new Serializable(topics, percentDeletes, producer, rand, keyCount, producedWriter){
            public static final long serialVersionUID = 0L;
            private final String[] topics$1;
            private final int percentDeletes$1;
            private final KafkaProducer producer$1;
            private final Random rand$1;
            private final int keyCount$1;
            private final BufferedWriter producedWriter$1;

            public final void apply(long i) {
                this.apply$mcVJ$sp(i);
            }

            public void apply$mcVJ$sp(long i) {
                String topic = this.topics$1[(int)(i % (long)this.topics$1.length)];
                int key = this.rand$1.nextInt(this.keyCount$1);
                boolean delete = i % 100L < (long)this.percentDeletes$1;
                ProducerRecord msg = delete ? new ProducerRecord(topic, (Object)((Object)BoxesRunTime.boxToInteger((int)key)).toString().getBytes(), null) : new ProducerRecord(topic, (Object)((Object)BoxesRunTime.boxToInteger((int)key)).toString().getBytes(), (Object)((Object)BoxesRunTime.boxToLong((long)i)).toString().getBytes());
                this.producer$1.send(msg);
                this.producedWriter$1.write(new TestRecord(topic, key, i, delete).toString());
                this.producedWriter$1.newLine();
            }
            {
                this.topics$1 = topics$1;
                this.percentDeletes$1 = percentDeletes$1;
                this.producer$1 = producer$1;
                this.rand$1 = rand$1;
                this.keyCount$1 = keyCount$1;
                this.producedWriter$1 = producedWriter$1;
            }
        });
        producedWriter.close();
        producer.close();
        return producedFile;
    }

    public ZookeeperConsumerConnector makeConsumer(String zkUrl, String[] topics) {
        Properties consumerProps = new Properties();
        consumerProps.setProperty("group.id", new StringBuilder().append((Object)"log-cleaner-test-").append((Object)BoxesRunTime.boxToInteger((int)new Random().nextInt(Integer.MAX_VALUE))).toString());
        consumerProps.setProperty("zookeeper.connect", zkUrl);
        consumerProps.setProperty("consumer.timeout.ms", ((Object)BoxesRunTime.boxToInteger((int)20000)).toString());
        consumerProps.setProperty("auto.offset.reset", "smallest");
        return new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps));
    }

    public File consumeMessages(String zkUrl, String[] topics) {
        ZookeeperConsumerConnector connector = this.makeConsumer(zkUrl, topics);
        Map streams = connector.createMessageStreams((Map)Predef$.MODULE$.refArrayOps((Object[])Predef$.MODULE$.refArrayOps((Object[])topics).map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Tuple2<String, Object> apply(String topic) {
                return new Tuple2((Object)topic, (Object)BoxesRunTime.boxToInteger((int)1));
            }
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class)))).toMap(Predef$.MODULE$.$conforms()), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()), (Decoder)new StringDecoder(StringDecoder$.MODULE$.$lessinit$greater$default$1()));
        File consumedFile = File.createTempFile("kafka-log-cleaner-consumed-", ".txt");
        Predef$.MODULE$.println((Object)new StringBuilder().append((Object)"Logging consumed messages to ").append((Object)consumedFile.getAbsolutePath()).toString());
        BufferedWriter consumedWriter = new BufferedWriter(new FileWriter(consumedFile));
        Predef$.MODULE$.refArrayOps((Object[])topics).foreach((Function1)new Serializable(streams, consumedWriter){
            public static final long serialVersionUID = 0L;
            private final Map streams$1;
            public final BufferedWriter consumedWriter$1;

            public final void apply(String topic) {
                KafkaStream stream = (KafkaStream)((IterableLike)this.streams$1.apply((Object)topic)).head();
                try {
                    stream.foreach((Function1)new Serializable(this, topic){
                        public static final long serialVersionUID = 0L;
                        private final /* synthetic */ anonfun.consumeMessages.1 $outer;
                        private final String topic$1;

                        public final void apply(MessageAndMetadata<String, String> item) {
                            boolean delete = item.message() == null;
                            long value = delete ? -1L : new StringOps(Predef$.MODULE$.augmentString((String)item.message())).toLong();
                            this.$outer.consumedWriter$1.write(new TestRecord(this.topic$1, new StringOps(Predef$.MODULE$.augmentString((String)item.key())).toInt(), value, delete).toString());
                            this.$outer.consumedWriter$1.newLine();
                        }
                        {
                            if ($outer == null) {
                                throw null;
                            }
                            this.$outer = $outer;
                            this.topic$1 = topic$1;
                        }
                    });
                }
                catch (ConsumerTimeoutException consumerTimeoutException) {
                    // empty catch block
                }
            }
            {
                this.streams$1 = streams$1;
                this.consumedWriter$1 = consumedWriter$1;
            }
        });
        consumedWriter.close();
        connector.shutdown();
        return consumedFile;
    }

    private TestLogCleaning$() {
        MODULE$ = this;
    }
}

