package org.apache.atlas.hook;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.kafka.NotificationProvider;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.security.InMemoryJAASConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ShutdownHookManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/atlas-notification-1.2.0.jar:org/apache/atlas/hook/AtlasHook.class */
public abstract class AtlasHook {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AtlasHook.class);
    public static final String ATLAS_NOTIFICATION_ASYNCHRONOUS = "atlas.notification.hook.asynchronous";
    public static final String ATLAS_NOTIFICATION_ASYNCHRONOUS_MIN_THREADS = "atlas.notification.hook.asynchronous.minThreads";
    public static final String ATLAS_NOTIFICATION_ASYNCHRONOUS_MAX_THREADS = "atlas.notification.hook.asynchronous.maxThreads";
    public static final String ATLAS_NOTIFICATION_ASYNCHRONOUS_KEEP_ALIVE_TIME_MS = "atlas.notification.hook.asynchronous.keepAliveTimeMs";
    public static final String ATLAS_NOTIFICATION_ASYNCHRONOUS_QUEUE_SIZE = "atlas.notification.hook.asynchronous.queueSize";
    public static final String ATLAS_NOTIFICATION_MAX_RETRIES = "atlas.notification.hook.retry.maxRetries";
    public static final String ATLAS_NOTIFICATION_RETRY_INTERVAL = "atlas.notification.hook.retry.interval";
    public static final String ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY = "atlas.notification.failed.messages.filename";
    public static final String ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY = "atlas.notification.log.failed.messages";
    public static final String ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME = "atlas_hook_failed_messages.log";
    protected static Configuration atlasProperties;
    protected static NotificationInterface notificationInterface;
    private static final int SHUTDOWN_HOOK_WAIT_TIME_MS = 3000;
    private static final boolean logFailedMessages;
    private static final FailedMessagesLogger failedMessagesLogger;
    private static final int notificationMaxRetries;
    private static final int notificationRetryInterval;
    private static ExecutorService executor;

    public static void notifyEntities(final List<HookNotification> list, final UserGroupInformation userGroupInformation, final int i) {
        if (executor == null) {
            notifyEntitiesInternal(list, i, userGroupInformation, notificationInterface, logFailedMessages, failedMessagesLogger);
        } else {
            executor.submit(new Runnable() { // from class: org.apache.atlas.hook.AtlasHook.2
                @Override // java.lang.Runnable
                public void run() {
                    AtlasHook.notifyEntitiesInternal(list, i, userGroupInformation, AtlasHook.notificationInterface, AtlasHook.logFailedMessages, AtlasHook.failedMessagesLogger);
                }
            });
        }
    }

    @VisibleForTesting
    static void notifyEntitiesInternal(final List<HookNotification> list, int i, UserGroupInformation userGroupInformation, final NotificationInterface notificationInterface2, boolean z, FailedMessagesLogger failedMessagesLogger2) {
        if (list == null || list.isEmpty()) {
            return;
        }
        int i2 = i < 1 ? 1 : i;
        Exception exc = null;
        for (int i3 = 1; i3 <= i2; i3++) {
            if (i3 > 1) {
                try {
                    LOG.debug("Sleeping for {} ms before retry", Integer.valueOf(notificationRetryInterval));
                    Thread.sleep(notificationRetryInterval);
                } catch (InterruptedException e) {
                    LOG.error("Notification hook thread sleep interrupted");
                }
            }
            try {
                if (userGroupInformation == null) {
                    notificationInterface2.send(NotificationInterface.NotificationType.HOOK, list);
                } else {
                    userGroupInformation.doAs(new PrivilegedExceptionAction<Object>() { // from class: org.apache.atlas.hook.AtlasHook.3
                        @Override // java.security.PrivilegedExceptionAction
                        public Object run() throws Exception {
                            NotificationInterface.this.send(NotificationInterface.NotificationType.HOOK, list);
                            return list;
                        }
                    });
                }
                exc = null;
                break;
            } catch (Exception e2) {
                exc = e2;
                LOG.error("Failed to send notification - attempt #{}; error={}", Integer.valueOf(i3), e2.getMessage());
            }
        }
        if (exc != null) {
            if (z && (exc instanceof NotificationException)) {
                Iterator<String> it2 = ((NotificationException) exc).getFailedMessages().iterator();
                while (it2.hasNext()) {
                    failedMessagesLogger2.log(it2.next());
                }
            }
            LOG.error("Giving up after {} failed attempts to send notification to Atlas: {}", Integer.valueOf(i2), list.toString(), exc);
        }
    }

    protected void notifyEntities(List<HookNotification> list, UserGroupInformation userGroupInformation) {
        notifyEntities(list, userGroupInformation, notificationMaxRetries);
    }

    public static String getUser() {
        return getUser(null, null);
    }

    public static String getUser(String str) {
        return getUser(str, null);
    }

    public static String getUser(String str, UserGroupInformation userGroupInformation) {
        if (StringUtils.isNotEmpty(str)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Returning userName {}", str);
            }
            return str;
        }
        if (userGroupInformation != null && StringUtils.isNotEmpty(userGroupInformation.getShortUserName())) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Returning ugi.getShortUserName {}", str);
            }
            return userGroupInformation.getShortUserName();
        }
        try {
            return UserGroupInformation.getCurrentUser().getShortUserName();
        } catch (IOException e) {
            LOG.warn("Failed for UserGroupInformation.getCurrentUser() ", (Throwable) e);
            return System.getProperty("user.name");
        }
    }

    private static boolean isLoginKeytabBased() {
        boolean z = false;
        try {
            z = UserGroupInformation.isLoginKeytabBased();
        } catch (Exception e) {
            LOG.warn("Error in determining keytab for KafkaClient-JAAS config", (Throwable) e);
        }
        return z;
    }

    private static boolean isLoginTicketBased() {
        boolean z = false;
        try {
            z = UserGroupInformation.isLoginTicketBased();
        } catch (Exception e) {
            LOG.warn("Error in determining ticket-cache for KafkaClient-JAAS config", (Throwable) e);
        }
        return z;
    }

    static {
        executor = null;
        try {
            atlasProperties = ApplicationProperties.get();
        } catch (Exception e) {
            LOG.info("Failed to load application properties", (Throwable) e);
        }
        String string = atlasProperties.getString(ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY, ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME);
        logFailedMessages = atlasProperties.getBoolean(ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY, true);
        if (logFailedMessages) {
            failedMessagesLogger = new FailedMessagesLogger(string);
            failedMessagesLogger.init();
        } else {
            failedMessagesLogger = null;
        }
        if (!isLoginKeytabBased() && isLoginTicketBased()) {
            InMemoryJAASConfiguration.setConfigSectionRedirect("KafkaClient", "ticketBased-KafkaClient");
        }
        notificationMaxRetries = atlasProperties.getInt(ATLAS_NOTIFICATION_MAX_RETRIES, 3);
        notificationRetryInterval = atlasProperties.getInt(ATLAS_NOTIFICATION_RETRY_INTERVAL, 1000);
        notificationInterface = NotificationProvider.get();
        String str = "";
        try {
            str = getUser();
        } catch (Exception e2) {
            LOG.warn("Error in determining current user", (Throwable) e2);
        }
        notificationInterface.setCurrentUser(str);
        if (atlasProperties.getBoolean(ATLAS_NOTIFICATION_ASYNCHRONOUS, Boolean.TRUE).booleanValue()) {
            executor = new ThreadPoolExecutor(atlasProperties.getInt(ATLAS_NOTIFICATION_ASYNCHRONOUS_MIN_THREADS, 1), atlasProperties.getInt(ATLAS_NOTIFICATION_ASYNCHRONOUS_MAX_THREADS, 1), atlasProperties.getLong(ATLAS_NOTIFICATION_ASYNCHRONOUS_KEEP_ALIVE_TIME_MS, 10000L), TimeUnit.MILLISECONDS, new LinkedBlockingDeque(atlasProperties.getInt(ATLAS_NOTIFICATION_ASYNCHRONOUS_QUEUE_SIZE, 10000)), new ThreadFactoryBuilder().setNameFormat("Atlas Notifier %d").setDaemon(true).build());
            ShutdownHookManager.get().addShutdownHook(new Thread() { // from class: org.apache.atlas.hook.AtlasHook.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        try {
                            AtlasHook.LOG.info("==> Shutdown of Atlas Hook");
                            AtlasHook.executor.shutdown();
                            AtlasHook.executor.awaitTermination(3000L, TimeUnit.MILLISECONDS);
                            ExecutorService unused = AtlasHook.executor = null;
                            AtlasHook.LOG.info("<== Shutdown of Atlas Hook");
                        } catch (InterruptedException e3) {
                            AtlasHook.LOG.info("Interrupt received in shutdown.", (Throwable) e3);
                            AtlasHook.LOG.info("<== Shutdown of Atlas Hook");
                        }
                    } catch (Throwable th) {
                        AtlasHook.LOG.info("<== Shutdown of Atlas Hook");
                        throw th;
                    }
                }
            }, 30);
        }
        LOG.info("Created Atlas Hook");
    }
}
