package org.apache.atlas.notification;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import javax.inject.Inject;
import kafka.utils.ShutdownableThread;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.preprocessor.EntityPreprocessor;
import org.apache.atlas.notification.preprocessor.PreprocessorContext;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.service.Service;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.utils.LruCache;
import org.apache.atlas.v1.model.notification.HookNotificationV1;
import org.apache.atlas.web.filters.AuditFilter;
import org.apache.atlas.web.service.ServiceState;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

@DependsOn({"atlasTypeDefStoreInitializer", "atlasTypeDefGraphStoreV2"})
@Component
@Order(4)
/* loaded from: input_file:WEB-INF/lib/atlas-webapp-1.2.0.jar:org/apache/atlas/notification/NotificationHookConsumer.class */
public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
    private static final int SC_OK = 200;
    private static final int SC_BAD_REQUEST = 400;
    private static final String TYPE_HIVE_COLUMN_LINEAGE = "hive_column_lineage";
    private static final String ATTRIBUTE_INPUTS = "inputs";
    private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
    public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
    public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries";
    public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize";
    public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval";
    public static final String CONSUMER_MIN_RETRY_INTERVAL = "atlas.notification.consumer.min.retry.interval";
    public static final String CONSUMER_MAX_RETRY_INTERVAL = "atlas.notification.consumer.max.retry.interval";
    public static final String CONSUMER_COMMIT_BATCH_SIZE = "atlas.notification.consumer.commit.batch.size";
    public static final String CONSUMER_DISABLED = "atlas.notification.consumer.disabled";
    public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633 = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633";
    public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold";
    public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.ignore.pattern";
    public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN = "atlas.notification.consumer.preprocess.hive_table.prune.pattern";
    public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE = "atlas.notification.consumer.preprocess.hive_table.cache.size";
    public static final String CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs";
    public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs";
    public static final int SERVER_READY_WAIT_TIME_MS = 1000;
    private final AtlasEntityStore atlasEntityStore;
    private final ServiceState serviceState;
    private final AtlasInstanceConverter instanceConverter;
    private final AtlasTypeRegistry typeRegistry;
    private final Map<String, PreprocessorContext.PreprocessAction> hiveTablesCache;
    private final boolean hiveTypesRemoveOwnedRefAttrs;
    private final boolean rdbmsTypesRemoveOwnedRefAttrs;
    private final boolean preprocessEnabled;
    private NotificationInterface notificationInterface;
    private ExecutorService executors;

    @VisibleForTesting
    List<HookConsumer> consumers;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) NotificationHookConsumer.class);
    private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger((Class<?>) NotificationHookConsumer.class);
    private static final Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
    private static final Logger LARGE_MESSAGES_LOG = LoggerFactory.getLogger("LARGE_MESSAGES");
    private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
    private static final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString();
    private final List<Pattern> hiveTablesToIgnore = new ArrayList();
    private final List<Pattern> hiveTablesToPrune = new ArrayList();
    private Configuration applicationProperties = ApplicationProperties.get();
    private final int maxRetries = this.applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
    private final int failedMsgCacheSize = this.applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 1);

    @VisibleForTesting
    final int consumerRetryInterval = this.applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
    private final int minWaitDuration = this.applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, this.consumerRetryInterval);
    private final int maxWaitDuration = this.applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, this.minWaitDuration * 60);
    private final int commitBatchSize = this.applicationProperties.getInt(CONSUMER_COMMIT_BATCH_SIZE, 50);
    private final boolean skipHiveColumnLineageHive20633 = this.applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, false);
    private final int skipHiveColumnLineageHive20633InputsThreshold = this.applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15);
    private final boolean consumerDisabled = this.applicationProperties.getBoolean(CONSUMER_DISABLED, false);
    private final int largeMessageProcessingTimeThresholdMs = this.applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms", 60000);

    /* loaded from: input_file:WEB-INF/lib/atlas-webapp-1.2.0.jar:org/apache/atlas/notification/NotificationHookConsumer$AdaptiveWaiter.class */
    static class AdaptiveWaiter {
        private final long increment;
        private final long maxDuration;
        private final long minDuration;
        private final long resetInterval;
        private long lastWaitAt = 0;

        @VisibleForTesting
        long waitDuration;

        public AdaptiveWaiter(long j, long j2, long j3) {
            this.minDuration = j;
            this.maxDuration = j2;
            this.increment = j3;
            this.waitDuration = j;
            this.resetInterval = j2 * 2;
        }

        public void pause(Exception exc) {
            setWaitDurations();
            try {
                if (NotificationHookConsumer.LOG.isDebugEnabled()) {
                    NotificationHookConsumer.LOG.debug("{} in NotificationHookConsumer. Waiting for {} ms for recovery.", exc.getClass().getName(), Long.valueOf(this.waitDuration), exc);
                }
                Thread.sleep(this.waitDuration);
            } catch (InterruptedException e) {
                if (NotificationHookConsumer.LOG.isDebugEnabled()) {
                    NotificationHookConsumer.LOG.debug("{} in NotificationHookConsumer. Waiting for recovery interrupted.", exc.getClass().getName(), e);
                }
            }
        }

        private void setWaitDurations() {
            long currentTimeMillis = this.lastWaitAt == 0 ? 0L : System.currentTimeMillis() - this.lastWaitAt;
            this.lastWaitAt = System.currentTimeMillis();
            if (currentTimeMillis > this.resetInterval) {
                this.waitDuration = this.minDuration;
                return;
            }
            this.waitDuration += this.increment;
            if (this.waitDuration > this.maxDuration) {
                this.waitDuration = this.maxDuration;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/atlas-webapp-1.2.0.jar:org/apache/atlas/notification/NotificationHookConsumer$FailedCommitOffsetRecorder.class */
    public static class FailedCommitOffsetRecorder {
        private Long currentOffset;

        FailedCommitOffsetRecorder() {
        }

        public void recordIfFailed(boolean z, long j) {
            if (z) {
                this.currentOffset = null;
            } else {
                this.currentOffset = Long.valueOf(j);
            }
        }

        public boolean isMessageReplayed(long j) {
            return this.currentOffset != null && this.currentOffset.longValue() == j;
        }

        public Long getCurrentOffset() {
            return this.currentOffset;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:WEB-INF/lib/atlas-webapp-1.2.0.jar:org/apache/atlas/notification/NotificationHookConsumer$HookConsumer.class */
    public class HookConsumer extends ShutdownableThread {
        private final NotificationConsumer<HookNotification> consumer;
        private final AtomicBoolean shouldRun;
        private final List<String> failedMessages;
        private final AdaptiveWaiter adaptiveWaiter;

        @VisibleForTesting
        final FailedCommitOffsetRecorder failedCommitOffsetRecorder;

        public HookConsumer(NotificationConsumer<HookNotification> notificationConsumer) {
            super("atlas-hook-consumer-thread", false);
            this.shouldRun = new AtomicBoolean(false);
            this.failedMessages = new ArrayList();
            this.adaptiveWaiter = new AdaptiveWaiter(NotificationHookConsumer.this.minWaitDuration, NotificationHookConsumer.this.maxWaitDuration, NotificationHookConsumer.this.minWaitDuration);
            this.consumer = notificationConsumer;
            this.failedCommitOffsetRecorder = new FailedCommitOffsetRecorder();
        }

        @Override // kafka.utils.ShutdownableThread
        public void doWork() {
            NotificationHookConsumer.LOG.info("==> HookConsumer doWork()");
            this.shouldRun.set(true);
            if (serverAvailable(new Timer())) {
                while (this.shouldRun.get()) {
                    try {
                        try {
                            try {
                                Iterator<AtlasKafkaMessage<HookNotification>> it2 = this.consumer.receive().iterator();
                                while (it2.hasNext()) {
                                    handleMessage(it2.next());
                                }
                            } catch (Exception e) {
                                if (!this.shouldRun.get()) {
                                    break;
                                }
                                NotificationHookConsumer.LOG.warn("Exception in NotificationHookConsumer", (Throwable) e);
                                this.adaptiveWaiter.pause(e);
                            }
                        } catch (IllegalStateException e2) {
                            this.adaptiveWaiter.pause(e2);
                        }
                    } finally {
                        if (this.consumer != null) {
                            NotificationHookConsumer.LOG.info("closing NotificationConsumer");
                            this.consumer.close();
                        }
                        NotificationHookConsumer.LOG.info("<== HookConsumer doWork()");
                    }
                }
            }
        }

        @VisibleForTesting
        void handleMessage(AtlasKafkaMessage<HookNotification> atlasKafkaMessage) throws AtlasServiceException, AtlasException {
            HookNotification message = atlasKafkaMessage.getMessage();
            String user = message.getUser();
            long currentTimeMillis = System.currentTimeMillis();
            AtlasPerfTracer perfTracer = AtlasPerfTracer.isPerfTraceEnabled(NotificationHookConsumer.PERF_LOG) ? AtlasPerfTracer.getPerfTracer(NotificationHookConsumer.PERF_LOG, message.getType().name()) : null;
            try {
                if (this.failedCommitOffsetRecorder.isMessageReplayed(atlasKafkaMessage.getOffset())) {
                    commit(atlasKafkaMessage);
                    AtlasPerfTracer.log(perfTracer);
                    long elapsedTime = perfTracer != null ? perfTracer.getElapsedTime() : 0L;
                    if (elapsedTime > NotificationHookConsumer.this.largeMessageProcessingTimeThresholdMs) {
                        String messageJson = AbstractNotification.getMessageJson(message);
                        NotificationHookConsumer.LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", Long.valueOf(elapsedTime), Integer.valueOf(messageJson.length()), Long.valueOf(atlasKafkaMessage.getOffset()));
                        NotificationHookConsumer.LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", Long.valueOf(elapsedTime), Integer.valueOf(messageJson.length()), Long.valueOf(atlasKafkaMessage.getOffset()), messageJson);
                    }
                    if (0 != 0) {
                        r19.setHttpStatus(0 != 0 ? 400 : 200);
                        r19.setTimeTaken(System.currentTimeMillis() - currentTimeMillis);
                        AuditFilter.audit(null);
                        return;
                    }
                    return;
                }
                PreprocessorContext preProcessNotificationMessage = NotificationHookConsumer.this.preProcessNotificationMessage(atlasKafkaMessage);
                if (NotificationHookConsumer.this.isEmptyMessage(atlasKafkaMessage)) {
                    commit(atlasKafkaMessage);
                    AtlasPerfTracer.log(perfTracer);
                    long elapsedTime2 = perfTracer != null ? perfTracer.getElapsedTime() : 0L;
                    if (elapsedTime2 > NotificationHookConsumer.this.largeMessageProcessingTimeThresholdMs) {
                        String messageJson2 = AbstractNotification.getMessageJson(message);
                        NotificationHookConsumer.LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", Long.valueOf(elapsedTime2), Integer.valueOf(messageJson2.length()), Long.valueOf(atlasKafkaMessage.getOffset()));
                        NotificationHookConsumer.LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", Long.valueOf(elapsedTime2), Integer.valueOf(messageJson2.length()), Long.valueOf(atlasKafkaMessage.getOffset()), messageJson2);
                    }
                    if (0 != 0) {
                        r19.setHttpStatus(0 != 0 ? 400 : 200);
                        r19.setTimeTaken(System.currentTimeMillis() - currentTimeMillis);
                        AuditFilter.audit(null);
                        return;
                    }
                    return;
                }
                int i = 0;
                while (true) {
                    if (i >= NotificationHookConsumer.this.maxRetries) {
                        break;
                    }
                    if (NotificationHookConsumer.LOG.isDebugEnabled()) {
                        NotificationHookConsumer.LOG.debug("handleMessage({}): attempt {}", message.getType().name(), Integer.valueOf(i));
                    }
                    try {
                        try {
                            RequestContext requestContext = RequestContext.get();
                            requestContext.setAttemptCount(i + 1);
                            requestContext.setMaxAttempts(NotificationHookConsumer.this.maxRetries);
                            requestContext.setUser(user, null);
                            requestContext.setInNotificationProcessing(true);
                            switch (message.getType()) {
                                case ENTITY_CREATE:
                                    AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities = NotificationHookConsumer.this.instanceConverter.toAtlasEntities(((HookNotificationV1.EntityCreateRequest) message).getEntities());
                                    r19 = 0 == 0 ? new AuditFilter.AuditLog(user, NotificationHookConsumer.THREADNAME_PREFIX, AtlasClient.API_V1.CREATE_ENTITY.getMethod(), AtlasClient.API_V1.CREATE_ENTITY.getNormalizedPath()) : null;
                                    createOrUpdate(atlasEntities, false, preProcessNotificationMessage);
                                    break;
                                case ENTITY_PARTIAL_UPDATE:
                                    HookNotificationV1.EntityPartialUpdateRequest entityPartialUpdateRequest = (HookNotificationV1.EntityPartialUpdateRequest) message;
                                    AtlasEntity.AtlasEntitiesWithExtInfo atlasEntity = NotificationHookConsumer.this.instanceConverter.toAtlasEntity(entityPartialUpdateRequest.getEntity());
                                    r19 = 0 == 0 ? new AuditFilter.AuditLog(user, NotificationHookConsumer.THREADNAME_PREFIX, AtlasClientV2.API_V2.UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(), String.format(AtlasClientV2.API_V2.UPDATE_ENTITY_BY_ATTRIBUTE.getNormalizedPath(), entityPartialUpdateRequest.getTypeName())) : null;
                                    atlasEntity.getEntities().get(0).setGuid(AtlasGraphUtilsV2.getGuidByUniqueAttributes(NotificationHookConsumer.this.typeRegistry.getEntityTypeByName(entityPartialUpdateRequest.getTypeName()), Collections.singletonMap(entityPartialUpdateRequest.getAttribute(), entityPartialUpdateRequest.getAttributeValue())));
                                    createOrUpdate(atlasEntity, true, preProcessNotificationMessage);
                                    break;
                                case ENTITY_DELETE:
                                    HookNotificationV1.EntityDeleteRequest entityDeleteRequest = (HookNotificationV1.EntityDeleteRequest) message;
                                    r19 = 0 == 0 ? new AuditFilter.AuditLog(user, NotificationHookConsumer.THREADNAME_PREFIX, AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE.getMethod(), String.format(AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE.getNormalizedPath(), entityDeleteRequest.getTypeName())) : null;
                                    try {
                                        NotificationHookConsumer.this.atlasEntityStore.deleteByUniqueAttributes((AtlasEntityType) NotificationHookConsumer.this.typeRegistry.getType(entityDeleteRequest.getTypeName()), Collections.singletonMap(entityDeleteRequest.getAttribute(), entityDeleteRequest.getAttributeValue()));
                                    } catch (ClassCastException e) {
                                        NotificationHookConsumer.LOG.error("Failed to delete entity {}", entityDeleteRequest);
                                    }
                                    break;
                                case ENTITY_FULL_UPDATE:
                                    AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities2 = NotificationHookConsumer.this.instanceConverter.toAtlasEntities(((HookNotificationV1.EntityUpdateRequest) message).getEntities());
                                    r19 = 0 == 0 ? new AuditFilter.AuditLog(user, NotificationHookConsumer.THREADNAME_PREFIX, AtlasClientV2.API_V2.UPDATE_ENTITY.getMethod(), AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath()) : null;
                                    createOrUpdate(atlasEntities2, false, preProcessNotificationMessage);
                                    break;
                                case ENTITY_CREATE_V2:
                                    AtlasEntity.AtlasEntitiesWithExtInfo entities = ((HookNotification.EntityCreateRequestV2) message).getEntities();
                                    r19 = 0 == 0 ? new AuditFilter.AuditLog(user, NotificationHookConsumer.THREADNAME_PREFIX, AtlasClientV2.API_V2.CREATE_ENTITY.getMethod(), AtlasClientV2.API_V2.CREATE_ENTITY.getNormalizedPath()) : null;
                                    createOrUpdate(entities, false, preProcessNotificationMessage);
                                    break;
                                case ENTITY_PARTIAL_UPDATE_V2:
                                    HookNotification.EntityPartialUpdateRequestV2 entityPartialUpdateRequestV2 = (HookNotification.EntityPartialUpdateRequestV2) message;
                                    AtlasObjectId entityId = entityPartialUpdateRequestV2.getEntityId();
                                    AtlasEntity.AtlasEntityWithExtInfo entity = entityPartialUpdateRequestV2.getEntity();
                                    r19 = 0 == 0 ? new AuditFilter.AuditLog(user, NotificationHookConsumer.THREADNAME_PREFIX, AtlasClientV2.API_V2.UPDATE_ENTITY.getMethod(), AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath()) : null;
                                    NotificationHookConsumer.this.atlasEntityStore.updateEntity(entityId, entity, true);
                                    break;
                                case ENTITY_FULL_UPDATE_V2:
                                    AtlasEntity.AtlasEntitiesWithExtInfo entities2 = ((HookNotification.EntityUpdateRequestV2) message).getEntities();
                                    r19 = 0 == 0 ? new AuditFilter.AuditLog(user, NotificationHookConsumer.THREADNAME_PREFIX, AtlasClientV2.API_V2.UPDATE_ENTITY.getMethod(), AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath()) : null;
                                    createOrUpdate(entities2, false, preProcessNotificationMessage);
                                    break;
                                case ENTITY_DELETE_V2:
                                    List<AtlasObjectId> entities3 = ((HookNotification.EntityDeleteRequestV2) message).getEntities();
                                    try {
                                        for (AtlasObjectId atlasObjectId : entities3) {
                                            if (r19 == null) {
                                                r19 = new AuditFilter.AuditLog(user, NotificationHookConsumer.THREADNAME_PREFIX, AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE.getMethod(), String.format(AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE.getNormalizedPath(), atlasObjectId.getTypeName()));
                                            }
                                            NotificationHookConsumer.this.atlasEntityStore.deleteByUniqueAttributes((AtlasEntityType) NotificationHookConsumer.this.typeRegistry.getType(atlasObjectId.getTypeName()), atlasObjectId.getUniqueAttributes());
                                        }
                                    } catch (ClassCastException e2) {
                                        NotificationHookConsumer.LOG.error("Failed to do delete entities {}", entities3);
                                    }
                                    break;
                                default:
                                    throw new IllegalStateException("Unknown notification type: " + message.getType().name());
                            }
                            RequestContext.clear();
                        } catch (Throwable th) {
                            RequestContext.clear();
                            throw th;
                        }
                    } catch (Throwable th2) {
                        RequestContext.get().resetEntityGuidUpdates();
                        if (i == NotificationHookConsumer.this.maxRetries - 1) {
                            String messageJson3 = AbstractNotification.getMessageJson(message);
                            NotificationHookConsumer.LOG.warn("Max retries exceeded for message {}", messageJson3, th2);
                            this.failedMessages.add(messageJson3);
                            if (this.failedMessages.size() >= NotificationHookConsumer.this.failedMsgCacheSize) {
                                recordFailedMessages();
                            }
                            RequestContext.clear();
                            AtlasPerfTracer.log(perfTracer);
                            long elapsedTime3 = perfTracer != null ? perfTracer.getElapsedTime() : 0L;
                            if (elapsedTime3 > NotificationHookConsumer.this.largeMessageProcessingTimeThresholdMs) {
                                String messageJson4 = AbstractNotification.getMessageJson(message);
                                NotificationHookConsumer.LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", Long.valueOf(elapsedTime3), Integer.valueOf(messageJson4.length()), Long.valueOf(atlasKafkaMessage.getOffset()));
                                NotificationHookConsumer.LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", Long.valueOf(elapsedTime3), Integer.valueOf(messageJson4.length()), Long.valueOf(atlasKafkaMessage.getOffset()), messageJson4);
                            }
                            if (0 != 0) {
                                r19.setHttpStatus(1 != 0 ? 400 : 200);
                                r19.setTimeTaken(System.currentTimeMillis() - currentTimeMillis);
                                AuditFilter.audit(null);
                                return;
                            }
                            return;
                        }
                        NotificationHookConsumer.LOG.warn("Error handling message", th2);
                        try {
                            NotificationHookConsumer.LOG.info("Sleeping for {} ms before retry", Integer.valueOf(NotificationHookConsumer.this.consumerRetryInterval));
                            Thread.sleep(NotificationHookConsumer.this.consumerRetryInterval);
                        } catch (InterruptedException e3) {
                            NotificationHookConsumer.LOG.error("Notification consumer thread sleep interrupted");
                        }
                        RequestContext.clear();
                        i++;
                    }
                }
                commit(atlasKafkaMessage);
                AtlasPerfTracer.log(perfTracer);
                long elapsedTime4 = perfTracer != null ? perfTracer.getElapsedTime() : 0L;
                if (elapsedTime4 > NotificationHookConsumer.this.largeMessageProcessingTimeThresholdMs) {
                    String messageJson5 = AbstractNotification.getMessageJson(message);
                    NotificationHookConsumer.LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", Long.valueOf(elapsedTime4), Integer.valueOf(messageJson5.length()), Long.valueOf(atlasKafkaMessage.getOffset()));
                    NotificationHookConsumer.LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", Long.valueOf(elapsedTime4), Integer.valueOf(messageJson5.length()), Long.valueOf(atlasKafkaMessage.getOffset()), messageJson5);
                }
                if (r19 != null) {
                    r19.setHttpStatus(0 != 0 ? 400 : 200);
                    r19.setTimeTaken(System.currentTimeMillis() - currentTimeMillis);
                    AuditFilter.audit(r19);
                }
            } catch (Throwable th3) {
                AtlasPerfTracer.log(perfTracer);
                long elapsedTime5 = perfTracer != null ? perfTracer.getElapsedTime() : 0L;
                if (elapsedTime5 > NotificationHookConsumer.this.largeMessageProcessingTimeThresholdMs) {
                    String messageJson6 = AbstractNotification.getMessageJson(message);
                    NotificationHookConsumer.LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", Long.valueOf(elapsedTime5), Integer.valueOf(messageJson6.length()), Long.valueOf(atlasKafkaMessage.getOffset()));
                    NotificationHookConsumer.LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", Long.valueOf(elapsedTime5), Integer.valueOf(messageJson6.length()), Long.valueOf(atlasKafkaMessage.getOffset()), messageJson6);
                }
                if (0 != 0) {
                    r19.setHttpStatus(0 != 0 ? 400 : 200);
                    r19.setTimeTaken(System.currentTimeMillis() - currentTimeMillis);
                    AuditFilter.audit(null);
                }
                throw th3;
            }
        }

        private void createOrUpdate(AtlasEntity.AtlasEntitiesWithExtInfo atlasEntitiesWithExtInfo, boolean z, PreprocessorContext preprocessorContext) throws AtlasBaseException {
            List<AtlasEntity> entities = atlasEntitiesWithExtInfo.getEntities();
            AtlasEntityStream atlasEntityStream = new AtlasEntityStream(atlasEntitiesWithExtInfo);
            if (NotificationHookConsumer.this.commitBatchSize > 0 && entities.size() > NotificationHookConsumer.this.commitBatchSize) {
                int i = 0;
                while (true) {
                    int i2 = i;
                    if (i2 >= entities.size()) {
                        break;
                    }
                    int i3 = i2 + NotificationHookConsumer.this.commitBatchSize;
                    if (i3 > entities.size()) {
                        i3 = entities.size();
                    }
                    ArrayList arrayList = new ArrayList(entities.subList(i2, i3));
                    NotificationHookConsumer.this.updateProcessedEntityReferences((List<AtlasEntity>) arrayList, preprocessorContext.getGuidAssignments());
                    NotificationHookConsumer.this.recordProcessedEntities(NotificationHookConsumer.this.atlasEntityStore.createOrUpdate(new AtlasEntityStream(new AtlasEntity.AtlasEntitiesWithExtInfo(arrayList), atlasEntityStream), z), preprocessorContext);
                    RequestContext.get().resetEntityGuidUpdates();
                    RequestContext.get().clearCache();
                    i = i2 + NotificationHookConsumer.this.commitBatchSize;
                }
            } else {
                NotificationHookConsumer.this.recordProcessedEntities(NotificationHookConsumer.this.atlasEntityStore.createOrUpdate(atlasEntityStream, z), preprocessorContext);
            }
            if (preprocessorContext != null) {
                preprocessorContext.prepareForPostUpdate();
                List<AtlasEntity> postUpdateEntities = preprocessorContext.getPostUpdateEntities();
                if (CollectionUtils.isNotEmpty(postUpdateEntities)) {
                    NotificationHookConsumer.this.atlasEntityStore.createOrUpdate(new AtlasEntityStream(postUpdateEntities), true);
                }
            }
        }

        private void recordFailedMessages() {
            Iterator<String> it2 = this.failedMessages.iterator();
            while (it2.hasNext()) {
                NotificationHookConsumer.FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", it2.next());
            }
            this.failedMessages.clear();
        }

        private void commit(AtlasKafkaMessage<HookNotification> atlasKafkaMessage) {
            boolean z = false;
            try {
                recordFailedMessages();
                this.consumer.commit(new TopicPartition(NotificationHookConsumer.ATLAS_HOOK_TOPIC, atlasKafkaMessage.getPartition()), atlasKafkaMessage.getOffset() + 1);
                z = true;
                this.failedCommitOffsetRecorder.recordIfFailed(true, atlasKafkaMessage.getOffset());
            } catch (Throwable th) {
                this.failedCommitOffsetRecorder.recordIfFailed(z, atlasKafkaMessage.getOffset());
                throw th;
            }
        }

        boolean serverAvailable(Timer timer) {
            while (NotificationHookConsumer.this.serviceState.getState() != ServiceState.ServiceStateValue.ACTIVE) {
                try {
                    try {
                        NotificationHookConsumer.LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...", (Object) 1000);
                        timer.sleep(1000);
                    } catch (InterruptedException e) {
                        NotificationHookConsumer.LOG.info("Interrupted while waiting for Atlas Server to become ready, exiting consumer thread.", (Throwable) e);
                        return false;
                    }
                } catch (Throwable th) {
                    NotificationHookConsumer.LOG.info("Handled AtlasServiceException while waiting for Atlas Server to become ready, exiting consumer thread.", th);
                    return false;
                }
            }
            NotificationHookConsumer.LOG.info("Atlas Server is ready, can start reading Kafka events.");
            return true;
        }

        @Override // kafka.utils.ShutdownableThread
        public void shutdown() {
            NotificationHookConsumer.LOG.info("==> HookConsumer shutdown()");
            if (this.shouldRun.get()) {
                super.initiateShutdown();
                this.shouldRun.set(false);
                if (this.consumer != null) {
                    this.consumer.wakeup();
                }
                super.awaitShutdown();
                NotificationHookConsumer.LOG.info("<== HookConsumer shutdown()");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/atlas-webapp-1.2.0.jar:org/apache/atlas/notification/NotificationHookConsumer$Timer.class */
    public static class Timer {
        Timer() {
        }

        public void sleep(int i) throws InterruptedException {
            Thread.sleep(i);
        }
    }

    @Inject
    public NotificationHookConsumer(NotificationInterface notificationInterface, AtlasEntityStore atlasEntityStore, ServiceState serviceState, AtlasInstanceConverter atlasInstanceConverter, AtlasTypeRegistry atlasTypeRegistry) throws AtlasException {
        this.notificationInterface = notificationInterface;
        this.atlasEntityStore = atlasEntityStore;
        this.serviceState = serviceState;
        this.instanceConverter = atlasInstanceConverter;
        this.typeRegistry = atlasTypeRegistry;
        String[] stringArray = this.applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN);
        String[] stringArray2 = this.applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN);
        if (stringArray != null) {
            for (String str : stringArray) {
                try {
                    this.hiveTablesToIgnore.add(Pattern.compile(str));
                    LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN, str);
                } catch (Throwable th) {
                    LOG.warn("failed to compile pattern {}", str, th);
                    LOG.warn("Ignoring invalid pattern in configuration {}: {}", CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN, str);
                }
            }
        }
        if (stringArray2 != null) {
            for (String str2 : stringArray2) {
                try {
                    this.hiveTablesToPrune.add(Pattern.compile(str2));
                    LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN, str2);
                } catch (Throwable th2) {
                    LOG.warn("failed to compile pattern {}", str2, th2);
                    LOG.warn("Ignoring invalid pattern in configuration {}: {}", CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN, str2);
                }
            }
        }
        if (this.hiveTablesToIgnore.isEmpty() && this.hiveTablesToPrune.isEmpty()) {
            this.hiveTablesCache = Collections.emptyMap();
        } else {
            this.hiveTablesCache = new LruCache(this.applicationProperties.getInt(CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE, 10000), 0);
        }
        this.hiveTypesRemoveOwnedRefAttrs = this.applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, true);
        this.rdbmsTypesRemoveOwnedRefAttrs = this.applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true);
        this.preprocessEnabled = !this.hiveTablesToIgnore.isEmpty() || !this.hiveTablesToPrune.isEmpty() || this.skipHiveColumnLineageHive20633 || this.hiveTypesRemoveOwnedRefAttrs || this.rdbmsTypesRemoveOwnedRefAttrs;
        LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, Boolean.valueOf(this.skipHiveColumnLineageHive20633));
        LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, Integer.valueOf(this.skipHiveColumnLineageHive20633InputsThreshold));
        LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, Boolean.valueOf(this.hiveTypesRemoveOwnedRefAttrs));
        LOG.info("{}={}", CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, Boolean.valueOf(this.rdbmsTypesRemoveOwnedRefAttrs));
        LOG.info("{}={}", CONSUMER_COMMIT_BATCH_SIZE, Integer.valueOf(this.commitBatchSize));
        LOG.info("{}={}", CONSUMER_DISABLED, Boolean.valueOf(this.consumerDisabled));
    }

    @Override // org.apache.atlas.service.Service
    public void start() throws AtlasException {
        if (this.consumerDisabled) {
            LOG.info("Hook consumer stopped. No hook messages will be processed. Set property '{}' to false to start consuming hook messages.", CONSUMER_DISABLED);
        } else {
            startInternal(this.applicationProperties, null);
        }
    }

    void startInternal(Configuration configuration, ExecutorService executorService) {
        if (this.consumers == null) {
            this.consumers = new ArrayList();
        }
        if (executorService != null) {
            this.executors = executorService;
        }
        if (HAConfiguration.isHAEnabled(configuration)) {
            return;
        }
        LOG.info("HA is disabled, starting consumers inline.");
        startConsumers(executorService);
    }

    private void startConsumers(ExecutorService executorService) {
        List createConsumers = this.notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, this.applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1));
        if (executorService == null) {
            executorService = Executors.newFixedThreadPool(createConsumers.size(), new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
        }
        this.executors = executorService;
        Iterator it2 = createConsumers.iterator();
        while (it2.hasNext()) {
            HookConsumer hookConsumer = new HookConsumer((NotificationConsumer) it2.next());
            this.consumers.add(hookConsumer);
            this.executors.submit(hookConsumer);
        }
    }

    @Override // org.apache.atlas.service.Service
    public void stop() {
        try {
            stopConsumerThreads();
            if (this.executors != null) {
                this.executors.shutdown();
                if (!this.executors.awaitTermination(5000L, TimeUnit.MILLISECONDS)) {
                    LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
                }
                this.executors = null;
            }
            this.notificationInterface.close();
        } catch (InterruptedException e) {
            LOG.error("Failure in shutting down consumers");
        }
    }

    private void stopConsumerThreads() {
        LOG.info("==> stopConsumerThreads()");
        if (this.consumers != null) {
            Iterator<HookConsumer> it2 = this.consumers.iterator();
            while (it2.hasNext()) {
                it2.next().shutdown();
            }
            this.consumers.clear();
        }
        LOG.info("<== stopConsumerThreads()");
    }

    @Override // org.apache.atlas.listener.ActiveStateChangeHandler
    public void instanceIsActive() {
        LOG.info("Reacting to active state: initializing Kafka consumers");
        startConsumers(this.executors);
    }

    @Override // org.apache.atlas.listener.ActiveStateChangeHandler
    public void instanceIsPassive() {
        LOG.info("Reacting to passive state: shutting down Kafka consumers.");
        stop();
    }

    @Override // org.apache.atlas.listener.ActiveStateChangeHandler
    public int getHandlerOrder() {
        return ActiveStateChangeHandler.HandlerOrder.NOTIFICATION_HOOK_CONSUMER.getOrder();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Removed duplicated region for block: B:29:0x00e8  */
    /* JADX WARN: Removed duplicated region for block: B:32:0x0100 A[SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public org.apache.atlas.notification.preprocessor.PreprocessorContext preProcessNotificationMessage(org.apache.atlas.kafka.AtlasKafkaMessage<org.apache.atlas.model.notification.HookNotification> r11) {
        /*
            Method dump skipped, instructions count: 307
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.atlas.notification.NotificationHookConsumer.preProcessNotificationMessage(org.apache.atlas.kafka.AtlasKafkaMessage):org.apache.atlas.notification.preprocessor.PreprocessorContext");
    }

    private void rdbmsTypeRemoveOwnedRefAttrs(PreprocessorContext preprocessorContext) {
        List<AtlasEntity> entities = preprocessorContext.getEntities();
        if (entities != null) {
            for (int i = 0; i < entities.size(); i++) {
                AtlasEntity atlasEntity = entities.get(i);
                EntityPreprocessor rdbmsPreprocessor = EntityPreprocessor.getRdbmsPreprocessor(atlasEntity.getTypeName());
                if (rdbmsPreprocessor != null) {
                    rdbmsPreprocessor.preprocess(atlasEntity, preprocessorContext);
                }
            }
        }
    }

    private void preprocessHiveTypes(PreprocessorContext preprocessorContext) {
        List<AtlasEntity> entities = preprocessorContext.getEntities();
        if (entities != null) {
            int i = 0;
            while (i < entities.size()) {
                AtlasEntity atlasEntity = entities.get(i);
                EntityPreprocessor hivePreprocessor = EntityPreprocessor.getHivePreprocessor(atlasEntity.getTypeName());
                if (hivePreprocessor != null) {
                    hivePreprocessor.preprocess(atlasEntity, preprocessorContext);
                    if (preprocessorContext.isIgnoredEntity(atlasEntity.getGuid())) {
                        int i2 = i;
                        i--;
                        entities.remove(i2);
                    }
                }
                i++;
            }
            Map<String, AtlasEntity> referredEntities = preprocessorContext.getReferredEntities();
            if (referredEntities != null) {
                Iterator<Map.Entry<String, AtlasEntity>> it2 = referredEntities.entrySet().iterator();
                while (it2.hasNext()) {
                    AtlasEntity value = it2.next().getValue();
                    EntityPreprocessor hivePreprocessor2 = EntityPreprocessor.getHivePreprocessor(value.getTypeName());
                    if (hivePreprocessor2 != null) {
                        hivePreprocessor2.preprocess(value, preprocessorContext);
                        if (preprocessorContext.isIgnoredEntity(value.getGuid())) {
                            it2.remove();
                        }
                    }
                }
            }
            int size = preprocessorContext.getIgnoredEntities().size();
            int size2 = preprocessorContext.getPrunedEntities().size();
            if (size > 0 || size2 > 0) {
                LOG.info("preprocess: ignored entities={}; pruned entities={}. topic-offset={}, partition={}", Integer.valueOf(size), Integer.valueOf(size2), Long.valueOf(preprocessorContext.getKafkaMessageOffset()), Integer.valueOf(preprocessorContext.getKafkaPartition()));
            }
        }
    }

    private void skipHiveColumnLineage(PreprocessorContext preprocessorContext) {
        List<AtlasEntity> entities = preprocessorContext.getEntities();
        if (entities != null) {
            int i = 0;
            int i2 = 0;
            int i3 = 0;
            HashSet hashSet = new HashSet();
            int i4 = 0;
            while (i4 < entities.size()) {
                AtlasEntity atlasEntity = entities.get(i4);
                if (StringUtils.equals(atlasEntity.getTypeName(), "hive_column_lineage")) {
                    Object attribute = atlasEntity.getAttribute("qualifiedName");
                    if (attribute != null) {
                        String obj = attribute.toString();
                        if (hashSet.contains(obj)) {
                            int i5 = i4;
                            i4--;
                            entities.remove(i5);
                            LOG.warn("removed duplicate hive_column_lineage entity: qualifiedName={}. topic-offset={}, partition={}", obj, Long.valueOf(preprocessorContext.getKafkaMessageOffset()), Integer.valueOf(preprocessorContext.getKafkaPartition()));
                            i3++;
                        } else {
                            hashSet.add(obj);
                        }
                    }
                    i++;
                    Object attribute2 = atlasEntity.getAttribute("inputs");
                    if (attribute2 instanceof Collection) {
                        i2 += ((Collection) attribute2).size();
                    }
                }
                i4++;
            }
            float f = i > 0 ? i2 / i : 0.0f;
            if (f > this.skipHiveColumnLineageHive20633InputsThreshold) {
                int i6 = 0;
                while (i6 < entities.size()) {
                    if (StringUtils.equals(entities.get(i6).getTypeName(), "hive_column_lineage")) {
                        int i7 = i6;
                        i6--;
                        entities.remove(i7);
                        i3++;
                    }
                    i6++;
                }
            }
            if (i3 > 0) {
                LOG.warn("removed {} hive_column_lineage entities. Average # of inputs={}, threshold={}, total # of inputs={}. topic-offset={}, partition={}", Integer.valueOf(i3), Float.valueOf(f), Integer.valueOf(this.skipHiveColumnLineageHive20633InputsThreshold), Integer.valueOf(i2), Long.valueOf(preprocessorContext.getKafkaMessageOffset()), Integer.valueOf(preprocessorContext.getKafkaPartition()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isEmptyMessage(AtlasKafkaMessage<HookNotification> atlasKafkaMessage) {
        boolean z;
        HookNotification message = atlasKafkaMessage.getMessage();
        switch (message.getType()) {
            case ENTITY_CREATE_V2:
                AtlasEntity.AtlasEntitiesWithExtInfo entities = ((HookNotification.EntityCreateRequestV2) message).getEntities();
                z = entities == null || CollectionUtils.isEmpty(entities.getEntities());
                break;
            case ENTITY_FULL_UPDATE_V2:
                AtlasEntity.AtlasEntitiesWithExtInfo entities2 = ((HookNotification.EntityUpdateRequestV2) message).getEntities();
                z = entities2 == null || CollectionUtils.isEmpty(entities2.getEntities());
                break;
            default:
                z = false;
                break;
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void recordProcessedEntities(EntityMutationResponse entityMutationResponse, PreprocessorContext preprocessorContext) {
        if (entityMutationResponse == null || preprocessorContext == null) {
            return;
        }
        if (MapUtils.isNotEmpty(entityMutationResponse.getGuidAssignments())) {
            preprocessorContext.getGuidAssignments().putAll(entityMutationResponse.getGuidAssignments());
        }
        if (CollectionUtils.isNotEmpty(entityMutationResponse.getCreatedEntities())) {
            for (AtlasEntityHeader atlasEntityHeader : entityMutationResponse.getCreatedEntities()) {
                if (atlasEntityHeader != null && atlasEntityHeader.getGuid() != null) {
                    preprocessorContext.getCreatedEntities().add(atlasEntityHeader.getGuid());
                }
            }
        }
        if (CollectionUtils.isNotEmpty(entityMutationResponse.getDeletedEntities())) {
            for (AtlasEntityHeader atlasEntityHeader2 : entityMutationResponse.getDeletedEntities()) {
                if (atlasEntityHeader2 != null && atlasEntityHeader2.getGuid() != null) {
                    preprocessorContext.getDeletedEntities().add(atlasEntityHeader2.getGuid());
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateProcessedEntityReferences(List<AtlasEntity> list, Map<String, String> map) {
        if (CollectionUtils.isNotEmpty(list) && MapUtils.isNotEmpty(map)) {
            for (AtlasEntity atlasEntity : list) {
                AtlasEntityType entityTypeByName = this.typeRegistry.getEntityTypeByName(atlasEntity.getTypeName());
                if (entityTypeByName != null) {
                    if (MapUtils.isNotEmpty(atlasEntity.getAttributes())) {
                        for (Map.Entry<String, Object> entry : atlasEntity.getAttributes().entrySet()) {
                            String key = entry.getKey();
                            Object value = entry.getValue();
                            if (value != null) {
                                AtlasStructType.AtlasAttribute attribute = entityTypeByName.getAttribute(key);
                                if (attribute == null) {
                                    attribute = entityTypeByName.getRelationshipAttribute(key, null);
                                }
                                if (attribute != null && attribute.isObjectRef()) {
                                    updateProcessedEntityReferences(value, map);
                                }
                            }
                        }
                    }
                    if (MapUtils.isNotEmpty(atlasEntity.getRelationshipAttributes())) {
                        Iterator<Map.Entry<String, Object>> it2 = atlasEntity.getRelationshipAttributes().entrySet().iterator();
                        while (it2.hasNext()) {
                            Object value2 = it2.next().getValue();
                            if (value2 != null) {
                                updateProcessedEntityReferences(value2, map);
                            }
                        }
                    }
                }
            }
        }
    }

    private void updateProcessedEntityReferences(Object obj, Map<String, String> map) {
        if (obj instanceof AtlasObjectId) {
            updateProcessedEntityReferences((AtlasObjectId) obj, map);
        } else if (obj instanceof Collection) {
            updateProcessedEntityReferences((Collection) obj, map);
        } else if (obj instanceof Map) {
            updateProcessedEntityReferences((Map) obj, map);
        }
    }

    private void updateProcessedEntityReferences(AtlasObjectId atlasObjectId, Map<String, String> map) {
        String guid = atlasObjectId.getGuid();
        if (guid == null || !map.containsKey(guid)) {
            return;
        }
        String str = map.get(guid);
        if (LOG.isDebugEnabled()) {
            LOG.debug("{}(guid={}) is already processed; updating its reference to use assigned-guid={}", atlasObjectId.getTypeName(), guid, str);
        }
        atlasObjectId.setGuid(str);
        atlasObjectId.setTypeName(null);
        atlasObjectId.setUniqueAttributes(null);
    }

    private void updateProcessedEntityReferences(Map map, Map<String, String> map2) {
        Object obj = map.get("guid");
        if (obj == null || !map2.containsKey(obj)) {
            return;
        }
        String str = map2.get(obj);
        if (LOG.isDebugEnabled()) {
            LOG.debug("{}(guid={}) is already processed; updating its reference to use assigned-guid={}", map.get("typeName"), obj, str);
        }
        map.put("guid", str);
        map.remove("typeName");
        map.remove(AtlasObjectId.KEY_UNIQUE_ATTRIBUTES);
    }

    private void updateProcessedEntityReferences(Collection collection, Map<String, String> map) {
        Iterator it2 = collection.iterator();
        while (it2.hasNext()) {
            updateProcessedEntityReferences(it2.next(), map);
        }
    }
}
