/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import java.util.Properties;
import java.util.Random;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionException;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import kafka.consumer.Blacklist;
import kafka.consumer.Consumer$;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerConnector;
import kafka.consumer.KafkaStream;
import kafka.consumer.TopicFilter;
import kafka.consumer.Whitelist;
import kafka.message.MessageAndMetadata;
import kafka.metrics.KafkaMetricsReporter$;
import kafka.serializer.DefaultDecoder;
import kafka.serializer.DefaultDecoder$;
import kafka.tools.DefaultMessageFormatter;
import kafka.tools.MessageFormatter;
import kafka.utils.CommandLineUtils$;
import kafka.utils.Log4jController$;
import kafka.utils.Logging;
import kafka.utils.Logging$class;
import kafka.utils.Utils$;
import kafka.utils.VerifiableProperties;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils$;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.log4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.ScalaObject;
import scala.Serializable;
import scala.collection.Iterable;
import scala.collection.JavaConversions$;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.mutable.StringBuilder;
import scala.runtime.BooleanRef;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public final class ConsoleConsumer$
implements Logging,
ScalaObject {
    public static final ConsoleConsumer$ MODULE$;
    private final String loggerName;
    private final Logger logger;
    private String logIdent;
    private final Log4jController$ kafka$utils$Logging$$log4jController;
    public volatile int bitmap$0;

    static {
        new ConsoleConsumer$();
    }

    public void main(String[] args) {
        Properties consumerProps$1;
        OptionParser parser = new OptionParser();
        ArgumentAcceptingOptionSpec topicIdOpt = parser.accepts("topic", "The topic id to consume on.").withRequiredArg().describedAs("topic").ofType(String.class);
        ArgumentAcceptingOptionSpec whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.").withRequiredArg().describedAs("whitelist").ofType(String.class);
        ArgumentAcceptingOptionSpec blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to exclude from consumption.").withRequiredArg().describedAs("blacklist").ofType(String.class);
        ArgumentAcceptingOptionSpec zkConnectOpt$1 = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.").withRequiredArg().describedAs("urls").ofType(String.class);
        ArgumentAcceptingOptionSpec consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.").withRequiredArg().describedAs("config file").ofType(String.class);
        ArgumentAcceptingOptionSpec messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.").withRequiredArg().describedAs("class").ofType(String.class).defaultsTo((Object)DefaultMessageFormatter.class.getName(), (Object[])new String[0]);
        ArgumentAcceptingOptionSpec messageFormatterArgOpt = parser.accepts("property").withRequiredArg().describedAs("prop").ofType(String.class);
        OptionSpecBuilder deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up");
        OptionSpecBuilder resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message.");
        ArgumentAcceptingOptionSpec maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.").withRequiredArg().describedAs("num_messages").ofType(Integer.class);
        OptionSpecBuilder skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, skip it instead of halt.");
        OptionSpecBuilder csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled");
        ArgumentAcceptingOptionSpec metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter isset, the csv metrics will be outputed here").withRequiredArg().describedAs("metrics dictory").ofType(String.class);
        if (args.length == 0) {
            CommandLineUtils$.MODULE$.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.");
        }
        BooleanRef groupIdPassed$1 = new BooleanRef(true);
        OptionSet options$1 = this.tryParse(parser, args);
        CommandLineUtils$.MODULE$.checkRequiredArgs(parser, options$1, (Seq<OptionSpec<?>>)Predef$.MODULE$.wrapRefArray((Object[])new OptionSpec[]{zkConnectOpt$1}));
        List topicOrFilterOpt = (List)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new ArgumentAcceptingOptionSpec[]{topicIdOpt, whitelistOpt, blacklistOpt})).filter((Function1)new Serializable(options$1){
            public static final long serialVersionUID;
            private final OptionSet options$1;

            static {
                long l = serialVersionUID = 0L;
            }

            public final boolean apply(OptionSpec<?> optionSpec) {
                return this.options$1.has(optionSpec);
            }
            {
                this.options$1 = optionSet;
            }
        });
        if (topicOrFilterOpt.size() != 1) {
            CommandLineUtils$.MODULE$.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic is required.");
        }
        String topicArg = (String)options$1.valueOf((OptionSpec)topicOrFilterOpt.head());
        TopicFilter filterSpec = options$1.has((OptionSpec)blacklistOpt) ? new Blacklist(topicArg) : new Whitelist(topicArg);
        boolean csvMetricsReporterEnabled = options$1.has((OptionSpec)csvMetricsReporterEnabledOpt);
        if (csvMetricsReporterEnabled) {
            Properties csvReporterProps = new Properties();
            csvReporterProps.put("kafka.metrics.polling.interval.secs", "5");
            csvReporterProps.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter");
            Object object = options$1.has((OptionSpec)metricsDirectoryOpt) ? csvReporterProps.put("kafka.csv.metrics.dir", options$1.valueOf((OptionSpec)metricsDirectoryOpt)) : csvReporterProps.put("kafka.csv.metrics.dir", "kafka_metrics");
            csvReporterProps.put("kafka.csv.metrics.reporter.enabled", "true");
            VerifiableProperties verifiableProps = new VerifiableProperties(csvReporterProps);
            KafkaMetricsReporter$.MODULE$.startReporters(verifiableProps);
        }
        Properties properties = consumerProps$1 = options$1.has((OptionSpec)consumerConfigOpt) ? Utils$.MODULE$.loadProps((String)options$1.valueOf((OptionSpec)consumerConfigOpt)) : new Properties();
        if (!consumerProps$1.containsKey("group.id")) {
            consumerProps$1.put("group.id", new StringBuilder().append((Object)"console-consumer-").append((Object)BoxesRunTime.boxToInteger((int)new Random().nextInt(100000))).toString());
            groupIdPassed$1.elem = false;
        }
        consumerProps$1.put("auto.offset.reset", options$1.has((OptionSpec)resetBeginningOpt) ? "smallest" : "largest");
        consumerProps$1.put("zookeeper.connect", options$1.valueOf((OptionSpec)zkConnectOpt$1));
        if (!options$1.has((OptionSpec)deleteConsumerOffsetsOpt) && options$1.has((OptionSpec)resetBeginningOpt) && this.checkZkPathExists((String)options$1.valueOf((OptionSpec)zkConnectOpt$1), new StringBuilder().append((Object)"/consumers/").append((Object)consumerProps$1.getProperty("group.id")).append((Object)"/offsets").toString())) {
            System.err.println(new StringBuilder().append((Object)"Found previous offset information for this group ").append((Object)consumerProps$1.getProperty("group.id")).append((Object)". Please use --delete-consumer-offsets to delete previous offsets metadata").toString());
            System.exit(1);
        }
        if (options$1.has((OptionSpec)deleteConsumerOffsetsOpt)) {
            ZkUtils$.MODULE$.maybeDeletePath((String)options$1.valueOf((OptionSpec)zkConnectOpt$1), new StringBuilder().append((Object)"/consumers/").append((Object)consumerProps$1.getProperty("group.id")).toString());
        }
        ConsumerConfig config = new ConsumerConfig(consumerProps$1);
        boolean skipMessageOnError$1 = options$1.has((OptionSpec)skipMessageOnErrorOpt);
        Class<?> messageFormatterClass = Class.forName((String)options$1.valueOf((OptionSpec)messageFormatterOpt));
        Properties formatterArgs = CommandLineUtils$.MODULE$.parseKeyValueArgs((Iterable<String>)JavaConversions$.MODULE$.asScalaBuffer(options$1.valuesOf((OptionSpec)messageFormatterArgOpt)));
        int maxMessages = options$1.has((OptionSpec)maxMessagesOpt) ? (Integer)options$1.valueOf((OptionSpec)maxMessagesOpt) : -1;
        ConsumerConnector connector$1 = Consumer$.MODULE$.create(config);
        Runtime.getRuntime().addShutdownHook(new Thread(zkConnectOpt$1, groupIdPassed$1, options$1, consumerProps$1, connector$1){
            private final ArgumentAcceptingOptionSpec zkConnectOpt$1;
            private final BooleanRef groupIdPassed$1;
            private final OptionSet options$1;
            private final Properties consumerProps$1;
            private final ConsumerConnector connector$1;

            public void run() {
                this.connector$1.shutdown();
                if (!this.groupIdPassed$1.elem) {
                    ZkUtils$.MODULE$.maybeDeletePath((String)this.options$1.valueOf((OptionSpec)this.zkConnectOpt$1), new StringBuilder().append((Object)"/consumers/").append(this.consumerProps$1.get("group.id")).toString());
                }
            }
            {
                this.zkConnectOpt$1 = argumentAcceptingOptionSpec;
                this.groupIdPassed$1 = booleanRef;
                this.options$1 = optionSet;
                this.consumerProps$1 = properties;
                this.connector$1 = consumerConnector;
            }
        });
        LongRef numMessages$1 = new LongRef(0L);
        MessageFormatter formatter$1 = (MessageFormatter)messageFormatterClass.newInstance();
        formatter$1.init(formatterArgs);
        try {
            KafkaStream stream = (KafkaStream)JavaConversions$.MODULE$.seqAsJavaList(connector$1.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(DefaultDecoder$.MODULE$.init$default$1()), new DefaultDecoder(DefaultDecoder$.MODULE$.init$default$1()))).get(0);
            KafkaStream iter = maxMessages >= 0 ? (Iterable)stream.slice(0, maxMessages) : stream;
            iter.foreach((Function1)new Serializable(skipMessageOnError$1, connector$1, numMessages$1, formatter$1){
                public static final long serialVersionUID;
                private final boolean skipMessageOnError$1;
                private final ConsumerConnector connector$1;
                private final LongRef numMessages$1;
                private final MessageFormatter formatter$1;

                static {
                    long l = serialVersionUID = 0L;
                }

                public final void apply(MessageAndMetadata<byte[], byte[]> messageAndTopic) {
                    Throwable throwable2;
                    block3: {
                        try {
                            this.formatter$1.writeTo(messageAndTopic.key(), messageAndTopic.message(), System.out);
                            ++this.numMessages$1.elem;
                        }
                        catch (Throwable throwable2) {
                            if (!this.skipMessageOnError$1) break block3;
                            ConsoleConsumer$.MODULE$.error((Function0<String>)new Serializable(this){
                                public static final long serialVersionUID;

                                static {
                                    long l = serialVersionUID = 0L;
                                }

                                public final String apply() {
                                    return "Error processing message, skipping this message: ";
                                }
                            }, (Function0<Throwable>)new Serializable(this, throwable2){
                                public static final long serialVersionUID;
                                private final Throwable e$1;

                                static {
                                    long l = serialVersionUID = 0L;
                                }

                                public final Throwable apply() {
                                    return this.e$1;
                                }
                                {
                                    this.e$1 = throwable;
                                }
                            });
                        }
                        if (System.out.checkError()) {
                            System.err.println("Unable to write to standard out, closing consumer.");
                            System.err.println(Predef$.MODULE$.augmentString("Consumed %d messages").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)this.numMessages$1.elem)})));
                            this.formatter$1.close();
                            this.connector$1.shutdown();
                            System.exit(1);
                        }
                        return;
                    }
                    throw throwable2;
                }
                {
                    this.skipMessageOnError$1 = bl;
                    this.connector$1 = consumerConnector;
                    this.numMessages$1 = longRef;
                    this.formatter$1 = messageFormatter;
                }
            });
        }
        catch (Throwable throwable) {
            this.error((Function0<String>)new Serializable(){
                public static final long serialVersionUID;

                static {
                    long l = serialVersionUID = 0L;
                }

                public final String apply() {
                    return "Error processing message, stopping consumer: ";
                }
            }, (Function0<Throwable>)new Serializable(throwable){
                public static final long serialVersionUID;
                private final Throwable e$2;

                static {
                    long l = serialVersionUID = 0L;
                }

                public final Throwable apply() {
                    return this.e$2;
                }
                {
                    this.e$2 = throwable;
                }
            });
        }
        System.err.println(Predef$.MODULE$.augmentString("Consumed %d messages").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)numMessages$1.elem)})));
        System.out.flush();
        formatter$1.close();
        connector$1.shutdown();
    }

    /*
     * WARNING - void declaration
     */
    public OptionSet tryParse(OptionParser parser, String[] args) {
        void var3_3;
        OptionSet exceptionResult1 = null;
        try {
            exceptionResult1 = parser.parse(args);
        }
        catch (OptionException optionException) {
            Utils$.MODULE$.croak(optionException.getMessage());
            exceptionResult1 = null;
        }
        return var3_3;
    }

    /*
     * WARNING - void declaration
     */
    public boolean checkZkPathExists(String zkUrl, String path) {
        void var3_3;
        boolean exceptionResult2 = false;
        try {
            ZkClient zk = new ZkClient(zkUrl, 30000, 30000, (ZkSerializer)ZKStringSerializer$.MODULE$);
            exceptionResult2 = zk.exists(path);
        }
        catch (Throwable throwable) {
            exceptionResult2 = false;
        }
        return (boolean)var3_3;
    }

    private ConsoleConsumer$() {
        MODULE$ = this;
        Logging$class.$init$(this);
    }
}

