package org.apache.atlas.storm.hook;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.storm.model.StormDataTypes;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.atlas.utils.HdfsNameServiceResolver;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.storm.ISubmitterHook;
import org.apache.storm.generated.Bolt;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/atlas/storm/hook/StormAtlasHook.class */
public class StormAtlasHook extends AtlasHook implements ISubmitterHook {
    public static final Logger LOG = LoggerFactory.getLogger(StormAtlasHook.class);
    public static final String ANONYMOUS_OWNER = "anonymous";
    public static final String HBASE_NAMESPACE_DEFAULT = "default";
    public static final String ATTRIBUTE_DB = "db";

    public void notify(TopologyInfo topologyInfo, Map map, StormTopology stormTopology) {
        LOG.info("Collecting metadata for a new storm topology: {}", topologyInfo.get_name());
        try {
            String user = getUser(topologyInfo.get_owner(), null);
            AtlasEntity createTopologyInstance = createTopologyInstance(topologyInfo, map);
            AtlasEntity.AtlasEntitiesWithExtInfo atlasEntitiesWithExtInfo = new AtlasEntity.AtlasEntitiesWithExtInfo(createTopologyInstance);
            addTopologyDataSets(stormTopology, topologyInfo.get_owner(), map, createTopologyInstance, atlasEntitiesWithExtInfo);
            List<AtlasEntity> createTopologyGraph = createTopologyGraph(stormTopology, stormTopology.get_spouts(), stormTopology.get_bolts());
            if (CollectionUtils.isNotEmpty(createTopologyGraph)) {
                createTopologyInstance.setAttribute("nodes", AtlasTypeUtil.getAtlasObjectIds(createTopologyGraph));
                Iterator<AtlasEntity> it = createTopologyGraph.iterator();
                while (it.hasNext()) {
                    atlasEntitiesWithExtInfo.addReferredEntity(it.next());
                }
            }
            notifyEntities(Collections.singletonList(new HookNotification.EntityCreateRequestV2(user, atlasEntitiesWithExtInfo)), null);
        } catch (Exception e) {
            throw new RuntimeException("Atlas hook is unable to process the topology.", e);
        }
    }

    private AtlasEntity createTopologyInstance(TopologyInfo topologyInfo, Map map) {
        AtlasEntity atlasEntity = new AtlasEntity(StormDataTypes.STORM_TOPOLOGY.getName());
        String str = topologyInfo.get_owner();
        if (StringUtils.isEmpty(str)) {
            str = ANONYMOUS_OWNER;
        }
        atlasEntity.setAttribute("id", topologyInfo.get_id());
        atlasEntity.setAttribute("name", topologyInfo.get_name());
        atlasEntity.setAttribute("qualifiedName", topologyInfo.get_name());
        atlasEntity.setAttribute("owner", str);
        atlasEntity.setAttribute("startTime", new Date(System.currentTimeMillis()));
        atlasEntity.setAttribute("clusterName", getClusterName(map));
        return atlasEntity;
    }

    private void addTopologyDataSets(StormTopology stormTopology, String str, Map map, AtlasEntity atlasEntity, AtlasEntity.AtlasEntityExtInfo atlasEntityExtInfo) {
        addTopologyInputs(stormTopology.get_spouts(), map, str, atlasEntity, atlasEntityExtInfo);
        addTopologyOutputs(stormTopology, str, map, atlasEntity, atlasEntityExtInfo);
    }

    private void addTopologyInputs(Map<String, SpoutSpec> map, Map map2, String str, AtlasEntity atlasEntity, AtlasEntity.AtlasEntityExtInfo atlasEntityExtInfo) {
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<String, SpoutSpec>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            Serializable serializable = (Serializable) Utils.javaDeserialize(it.next().getValue().get_spout_object().get_serialized_java(), Serializable.class);
            AtlasEntity addDataSet = addDataSet(serializable.getClass().getSimpleName(), str, serializable, map2, atlasEntityExtInfo);
            if (addDataSet != null) {
                arrayList.add(addDataSet);
            }
        }
        atlasEntity.setAttribute("inputs", AtlasTypeUtil.getAtlasObjectIds(arrayList));
    }

    private void addTopologyOutputs(StormTopology stormTopology, String str, Map map, AtlasEntity atlasEntity, AtlasEntity.AtlasEntityExtInfo atlasEntityExtInfo) {
        ArrayList arrayList = new ArrayList();
        Map map2 = stormTopology.get_bolts();
        Iterator<String> it = StormTopologyUtil.getTerminalUserBoltNames(stormTopology).iterator();
        while (it.hasNext()) {
            Serializable serializable = (Serializable) Utils.javaDeserialize(((Bolt) map2.get(it.next())).get_bolt_object().get_serialized_java(), Serializable.class);
            AtlasEntity addDataSet = addDataSet(serializable.getClass().getSimpleName(), str, serializable, map, atlasEntityExtInfo);
            if (addDataSet != null) {
                arrayList.add(addDataSet);
            }
        }
        atlasEntity.setAttribute("outputs", AtlasTypeUtil.getAtlasObjectIds(arrayList));
    }

    private AtlasEntity addDataSet(String str, String str2, Serializable serializable, Map map, AtlasEntity.AtlasEntityExtInfo atlasEntityExtInfo) {
        Map<String, String> fieldValues = StormTopologyUtil.getFieldValues(serializable, true, null);
        AtlasEntity atlasEntity = null;
        boolean z = -1;
        switch (str.hashCode()) {
            case -1782165218:
                if (str.equals("HdfsBolt")) {
                    z = 2;
                    break;
                }
                break;
            case -1194476987:
                if (str.equals("HiveBolt")) {
                    z = 3;
                    break;
                }
                break;
            case -374266773:
                if (str.equals("KafkaSpout")) {
                    z = false;
                    break;
                }
                break;
            case 1349878126:
                if (str.equals("HBaseBolt")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                String str3 = fieldValues.get("KafkaSpout.kafkaSpoutConfig.translator.topic");
                String str4 = fieldValues.get("KafkaSpout.kafkaSpoutConfig.kafkaProps.bootstrap.servers");
                if (StringUtils.isEmpty(str3)) {
                    str3 = fieldValues.get("KafkaSpout._spoutConfig.topic");
                }
                if (StringUtils.isEmpty(str4)) {
                    str4 = fieldValues.get("KafkaSpout._spoutConfig.hosts.brokerZkStr");
                }
                if (StringUtils.isEmpty(str2)) {
                    str2 = ANONYMOUS_OWNER;
                }
                String clusterName = getClusterName(map);
                if (str3 != null) {
                    atlasEntity = new AtlasEntity(StormDataTypes.KAFKA_TOPIC.getName());
                    atlasEntity.setAttribute("topic", str3);
                    atlasEntity.setAttribute("uri", str4);
                    atlasEntity.setAttribute("owner", str2);
                    atlasEntity.setAttribute("qualifiedName", getKafkaTopicQualifiedName(clusterName, str3));
                    atlasEntity.setAttribute("name", str3);
                    break;
                } else {
                    LOG.error("Kafka topic name not found");
                    break;
                }
            case true:
                String str5 = fieldValues.get("HBaseBolt.tableName");
                String str6 = fieldValues.get("hbase.rootdir");
                if (StringUtils.isEmpty(str6)) {
                    str6 = str5;
                }
                String extractComponentClusterName = extractComponentClusterName(HBaseConfiguration.create(), map);
                if (str5 != null) {
                    atlasEntity = new AtlasEntity(StormDataTypes.HBASE_TABLE.getName());
                    atlasEntity.setAttribute("uri", str5);
                    atlasEntity.setAttribute("name", str6);
                    atlasEntity.setAttribute("owner", map.get("storm.kerberos.principal"));
                    atlasEntity.setAttribute("qualifiedName", getHbaseTableQualifiedName(extractComponentClusterName, HBASE_NAMESPACE_DEFAULT, str5));
                    break;
                } else {
                    LOG.error("HBase table name not found");
                    break;
                }
            case true:
                String str7 = fieldValues.get("HdfsBolt.fsUrl") + (fieldValues.get("HdfsBolt.rotationActions") == null ? fieldValues.get("HdfsBolt.fileNameFormat.path") : fieldValues.get("HdfsBolt.rotationActions"));
                Path path = new Path(str7);
                String nameServiceIDForPath = HdfsNameServiceResolver.getNameServiceIDForPath(str7);
                String clusterName2 = getClusterName(map);
                atlasEntity = new AtlasEntity("hdfs_path");
                atlasEntity.setAttribute("clusterName", getClusterName(map));
                atlasEntity.setAttribute("owner", map.get("hdfs.kerberos.principal"));
                atlasEntity.setAttribute("name", Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase());
                if (!StringUtils.isNotEmpty(nameServiceIDForPath)) {
                    atlasEntity.setAttribute("path", str7);
                    atlasEntity.setAttribute("qualifiedName", getHdfsPathQualifiedName(clusterName2, str7));
                    break;
                } else {
                    String pathWithNameServiceID = HdfsNameServiceResolver.getPathWithNameServiceID(str7);
                    atlasEntity.setAttribute("path", pathWithNameServiceID);
                    atlasEntity.setAttribute("nameServiceId", nameServiceIDForPath);
                    atlasEntity.setAttribute("qualifiedName", getHdfsPathQualifiedName(clusterName2, pathWithNameServiceID));
                    break;
                }
            case true:
                String extractComponentClusterName2 = extractComponentClusterName(new HiveConf(), map);
                String str8 = fieldValues.get("HiveBolt.options.databaseName");
                String str9 = fieldValues.get("HiveBolt.options.tableName");
                if (str8 != null && str9 != null) {
                    AtlasEntity atlasEntity2 = new AtlasEntity("hive_db");
                    atlasEntity2.setAttribute("name", str8);
                    atlasEntity2.setAttribute("qualifiedName", HiveMetaStoreBridge.getDBQualifiedName(getClusterName(map), str8));
                    atlasEntity2.setAttribute("clusterName", getClusterName(map));
                    atlasEntityExtInfo.addReferredEntity(atlasEntity2);
                    atlasEntity = new AtlasEntity("hive_table");
                    atlasEntity.setAttribute("name", str9);
                    atlasEntity.setAttribute(ATTRIBUTE_DB, AtlasTypeUtil.getAtlasObjectId(atlasEntity2));
                    atlasEntity.setAttribute("qualifiedName", HiveMetaStoreBridge.getTableQualifiedName(extractComponentClusterName2, str8, str9));
                    break;
                } else {
                    LOG.error("Hive database or table name not found");
                    break;
                }
            default:
                return null;
        }
        if (atlasEntity != null) {
            atlasEntityExtInfo.addReferredEntity(atlasEntity);
        }
        return atlasEntity;
    }

    private List<AtlasEntity> createTopologyGraph(StormTopology stormTopology, Map<String, SpoutSpec> map, Map<String, Bolt> map2) {
        HashMap hashMap = new HashMap();
        addSpouts(map, hashMap);
        addBolts(map2, hashMap);
        addGraphConnections(stormTopology, hashMap);
        return new ArrayList(hashMap.values());
    }

    private void addSpouts(Map<String, SpoutSpec> map, Map<String, AtlasEntity> map2) {
        for (Map.Entry<String, SpoutSpec> entry : map.entrySet()) {
            String key = entry.getKey();
            map2.put(key, createSpoutInstance(key, entry.getValue()));
        }
    }

    private void addBolts(Map<String, Bolt> map, Map<String, AtlasEntity> map2) {
        for (Map.Entry<String, Bolt> entry : map.entrySet()) {
            String key = entry.getKey();
            map2.put(key, createBoltInstance(key, entry.getValue()));
        }
    }

    private AtlasEntity createSpoutInstance(String str, SpoutSpec spoutSpec) {
        AtlasEntity atlasEntity = new AtlasEntity(StormDataTypes.STORM_SPOUT.getName());
        Serializable serializable = (Serializable) Utils.javaDeserialize(spoutSpec.get_spout_object().get_serialized_java(), Serializable.class);
        Map<String, String> fieldValues = StormTopologyUtil.getFieldValues(serializable, true, null);
        atlasEntity.setAttribute("name", str);
        atlasEntity.setAttribute("driverClass", serializable.getClass().getName());
        atlasEntity.setAttribute("conf", fieldValues);
        return atlasEntity;
    }

    private AtlasEntity createBoltInstance(String str, Bolt bolt) {
        AtlasEntity atlasEntity = new AtlasEntity(StormDataTypes.STORM_BOLT.getName());
        Serializable serializable = (Serializable) Utils.javaDeserialize(bolt.get_bolt_object().get_serialized_java(), Serializable.class);
        Map<String, String> fieldValues = StormTopologyUtil.getFieldValues(serializable, true, null);
        atlasEntity.setAttribute("name", str);
        atlasEntity.setAttribute("driverClass", serializable.getClass().getName());
        atlasEntity.setAttribute("conf", fieldValues);
        return atlasEntity;
    }

    private void addGraphConnections(StormTopology stormTopology, Map<String, AtlasEntity> map) {
        Map<String, Set<String>> adjacencyMap = StormTopologyUtil.getAdjacencyMap(stormTopology, true);
        Iterator<Map.Entry<String, Set<String>>> it = adjacencyMap.entrySet().iterator();
        while (it.hasNext()) {
            String key = it.next().getKey();
            Set<String> set = adjacencyMap.get(key);
            if (!CollectionUtils.isEmpty(set)) {
                AtlasEntity atlasEntity = map.get(key);
                ArrayList arrayList = new ArrayList(set.size());
                arrayList.addAll(set);
                atlasEntity.setAttribute("outputs", arrayList);
                Iterator<String> it2 = set.iterator();
                while (it2.hasNext()) {
                    AtlasEntity atlasEntity2 = map.get(it2.next());
                    List list = (List) atlasEntity2.getAttribute("inputs");
                    if (list == null) {
                        list = new ArrayList();
                    }
                    list.add(key);
                    atlasEntity2.setAttribute("inputs", list);
                }
            }
        }
    }

    public static String getKafkaTopicQualifiedName(String str, String str2) {
        return String.format("%s@%s", str2.toLowerCase(), str);
    }

    public static String getHbaseTableQualifiedName(String str, String str2, String str3) {
        return String.format("%s.%s@%s", str2.toLowerCase(), str3.toLowerCase(), str);
    }

    public static String getHdfsPathQualifiedName(String str, String str2) {
        return String.format("%s@%s", str2.toLowerCase(), str);
    }

    private String getClusterName(Map map) {
        return atlasProperties.getString("atlas.cluster.name", "primary");
    }

    private String extractComponentClusterName(Configuration configuration, Map map) {
        String str = configuration.get("atlas.cluster.name", (String) null);
        if (str == null) {
            str = getClusterName(map);
        }
        return str;
    }
}
