/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kafka;

import com.google.common.base.Preconditions;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.storm.kafka.Broker;
import org.apache.storm.kafka.trident.GlobalPartitionInformation;
import org.apache.storm.utils.Utils;
import org.apache.zookeeper.KeeperException;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DynamicBrokersReader {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicBrokersReader.class);
    private CuratorFramework _curator;
    private String _zkPath;
    private String _topic;
    private Boolean _isWildcardTopic;

    public DynamicBrokersReader(Map conf, String zkStr, String zkPath, String topic) {
        Preconditions.checkNotNull((Object)conf, (Object)"conf cannot be null");
        this.validateConfig(conf);
        Preconditions.checkNotNull((Object)zkStr, (Object)"zkString cannot be null");
        Preconditions.checkNotNull((Object)zkPath, (Object)"zkPath cannot be null");
        Preconditions.checkNotNull((Object)topic, (Object)"topic cannot be null");
        this._zkPath = zkPath;
        this._topic = topic;
        this._isWildcardTopic = Utils.getBoolean(conf.get("kafka.topic.wildcard.match"), (boolean)false);
        try {
            this._curator = CuratorFrameworkFactory.newClient((String)zkStr, (int)Utils.getInt(conf.get("storm.zookeeper.session.timeout")), (int)Utils.getInt(conf.get("storm.zookeeper.connection.timeout")), (RetryPolicy)new RetryNTimes(Utils.getInt(conf.get("storm.zookeeper.retry.times")).intValue(), Utils.getInt(conf.get("storm.zookeeper.retry.interval")).intValue()));
            this._curator.start();
        }
        catch (Exception ex) {
            LOG.error("Couldn't connect to zookeeper", (Throwable)ex);
            throw new RuntimeException(ex);
        }
    }

    public List<GlobalPartitionInformation> getBrokerInfo() throws SocketTimeoutException {
        List<String> topics = this.getTopics();
        ArrayList<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
        for (String topic : topics) {
            GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(topic, this._isWildcardTopic);
            try {
                int numPartitionsForTopic = this.getNumPartitions(topic);
                String brokerInfoPath = this.brokerPath();
                for (int partition = 0; partition < numPartitionsForTopic; ++partition) {
                    int leader = this.getLeaderFor(topic, partition);
                    String path = brokerInfoPath + "/" + leader;
                    try {
                        byte[] brokerData = (byte[])this._curator.getData().forPath(path);
                        Broker hp = this.getBrokerHost(brokerData);
                        globalPartitionInformation.addPartition(partition, hp);
                        continue;
                    }
                    catch (KeeperException.NoNodeException e) {
                        LOG.error("Node {} does not exist ", (Object)path);
                    }
                }
            }
            catch (SocketTimeoutException e) {
                throw e;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);
            partitions.add(globalPartitionInformation);
        }
        return partitions;
    }

    private int getNumPartitions(String topic) {
        try {
            String topicBrokersPath = this.partitionPath(topic);
            List children = (List)this._curator.getChildren().forPath(topicBrokersPath);
            return children.size();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private List<String> getTopics() {
        ArrayList<String> topics = new ArrayList<String>();
        if (!this._isWildcardTopic.booleanValue()) {
            topics.add(this._topic);
            return topics;
        }
        try {
            List children = (List)this._curator.getChildren().forPath(this.topicsPath());
            for (String t : children) {
                if (!t.matches(this._topic)) continue;
                LOG.info(String.format("Found matching topic %s", t));
                topics.add(t);
            }
            return topics;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public String topicsPath() {
        return this._zkPath + "/topics";
    }

    public String partitionPath(String topic) {
        return this.topicsPath() + "/" + topic + "/partitions";
    }

    public String brokerPath() {
        return this._zkPath + "/ids";
    }

    private int getLeaderFor(String topic, long partition) {
        try {
            String topicBrokersPath = this.partitionPath(topic);
            byte[] hostPortData = (byte[])this._curator.getData().forPath(topicBrokersPath + "/" + partition + "/state");
            Map value = (Map)JSONValue.parseWithException((String)new String(hostPortData, "UTF-8"));
            Integer leader = ((Number)value.get("leader")).intValue();
            if (leader == -1) {
                throw new RuntimeException("No leader found for partition " + partition);
            }
            return leader;
        }
        catch (RuntimeException e) {
            throw e;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void close() {
        this._curator.close();
    }

    private Broker getBrokerHost(byte[] contents) {
        try {
            Map value = (Map)JSONValue.parseWithException((String)new String(contents, "UTF-8"));
            String host = (String)value.get("host");
            Integer port = ((Long)value.get("port")).intValue();
            return new Broker(host, port);
        }
        catch (RuntimeException e) {
            throw e;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void validateConfig(Map conf) {
        Preconditions.checkNotNull(conf.get("storm.zookeeper.session.timeout"), (String)"%s cannot be null", (Object[])new Object[]{"storm.zookeeper.session.timeout"});
        Preconditions.checkNotNull(conf.get("storm.zookeeper.connection.timeout"), (String)"%s cannot be null", (Object[])new Object[]{"storm.zookeeper.connection.timeout"});
        Preconditions.checkNotNull(conf.get("storm.zookeeper.retry.times"), (String)"%s cannot be null", (Object[])new Object[]{"storm.zookeeper.retry.times"});
        Preconditions.checkNotNull(conf.get("storm.zookeeper.retry.interval"), (String)"%s cannot be null", (Object[])new Object[]{"storm.zookeeper.retry.interval"});
    }
}

