/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregator;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;

public class LogAggregationService
extends AbstractService
implements LogHandler {
    private static final Log LOG = LogFactory.getLog(LogAggregationService.class);
    private static final FsPermission TLDIR_PERMISSIONS = FsPermission.createImmutable((short)1023);
    private static final FsPermission APP_DIR_PERMISSIONS = FsPermission.createImmutable((short)504);
    private final Context context;
    private final DeletionService deletionService;
    private final Dispatcher dispatcher;
    private LocalDirsHandlerService dirsHandler;
    Path remoteRootLogDir;
    String remoteRootLogDirSuffix;
    private NodeId nodeId;
    private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
    private final ExecutorService threadPool;

    public LogAggregationService(Dispatcher dispatcher, Context context, DeletionService deletionService, LocalDirsHandlerService dirsHandler) {
        super(LogAggregationService.class.getName());
        this.dispatcher = dispatcher;
        this.context = context;
        this.deletionService = deletionService;
        this.dirsHandler = dirsHandler;
        this.appLogAggregators = new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
        this.threadPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("LogAggregationService #%d").build());
    }

    protected void serviceInit(Configuration conf) throws Exception {
        this.remoteRootLogDir = new Path(conf.get("yarn.nodemanager.remote-app-log-dir", "/tmp/logs"));
        this.remoteRootLogDirSuffix = conf.get("yarn.nodemanager.remote-app-log-dir-suffix", "logs");
        super.serviceInit(conf);
    }

    protected void serviceStart() throws Exception {
        this.nodeId = this.context.getNodeId();
        super.serviceStart();
    }

    protected void serviceStop() throws Exception {
        LOG.info((Object)(this.getName() + " waiting for pending aggregation during exit"));
        this.stopAggregators();
        super.serviceStop();
    }

    private void stopAggregators() {
        this.threadPool.shutdown();
        boolean shouldAbort = this.context.getNMStateStore().canRecover() && !this.context.getDecommissioned();
        for (AppLogAggregator aggregator : this.appLogAggregators.values()) {
            if (shouldAbort) {
                aggregator.abortLogAggregation();
                continue;
            }
            aggregator.finishLogAggregation();
        }
        while (!this.threadPool.isTerminated()) {
            for (ApplicationId appId : this.appLogAggregators.keySet()) {
                LOG.info((Object)("Waiting for aggregation to complete for " + appId));
            }
            try {
                if (this.threadPool.awaitTermination(30L, TimeUnit.SECONDS)) continue;
                this.threadPool.shutdownNow();
            }
            catch (InterruptedException e) {
                LOG.warn((Object)"Aggregation stop interrupted!");
                break;
            }
        }
        for (ApplicationId appId : this.appLogAggregators.keySet()) {
            LOG.warn((Object)("Some logs may not have been aggregated for " + appId));
        }
    }

    protected FileSystem getFileSystem(Configuration conf) throws IOException {
        return FileSystem.get((Configuration)conf);
    }

    void verifyAndCreateRemoteLogDir(Configuration conf) {
        FileSystem remoteFS = null;
        try {
            remoteFS = this.getFileSystem(conf);
        }
        catch (IOException e) {
            throw new YarnRuntimeException("Unable to get Remote FileSystem instance", (Throwable)e);
        }
        boolean remoteExists = true;
        try {
            FsPermission perms = remoteFS.getFileStatus(this.remoteRootLogDir).getPermission();
            if (!perms.equals((Object)TLDIR_PERMISSIONS)) {
                LOG.warn((Object)("Remote Root Log Dir [" + this.remoteRootLogDir + "] already exist, but with incorrect permissions. " + "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms + "]." + " The cluster may have problems with multiple users."));
            }
        }
        catch (FileNotFoundException e) {
            remoteExists = false;
        }
        catch (IOException e) {
            throw new YarnRuntimeException("Failed to check permissions for dir [" + this.remoteRootLogDir + "]", (Throwable)e);
        }
        if (!remoteExists) {
            LOG.warn((Object)("Remote Root Log Dir [" + this.remoteRootLogDir + "] does not exist. Attempting to create it."));
            try {
                Path qualified = this.remoteRootLogDir.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory());
                remoteFS.mkdirs(qualified, new FsPermission(TLDIR_PERMISSIONS));
                remoteFS.setPermission(qualified, new FsPermission(TLDIR_PERMISSIONS));
            }
            catch (IOException e) {
                throw new YarnRuntimeException("Failed to create remoteLogDir [" + this.remoteRootLogDir + "]", (Throwable)e);
            }
        }
    }

    Path getRemoteNodeLogFileForApp(ApplicationId appId, String user) {
        return LogAggregationUtils.getRemoteNodeLogFileForApp((Path)this.remoteRootLogDir, (ApplicationId)appId, (String)user, (NodeId)this.nodeId, (String)this.remoteRootLogDirSuffix);
    }

    Path getRemoteAppLogDir(ApplicationId appId, String user) {
        return LogAggregationUtils.getRemoteAppLogDir((Path)this.remoteRootLogDir, (ApplicationId)appId, (String)user, (String)this.remoteRootLogDirSuffix);
    }

    private void createDir(FileSystem fs, Path path, FsPermission fsPerm) throws IOException {
        FsPermission dirPerm = new FsPermission(fsPerm);
        fs.mkdirs(path, dirPerm);
        FsPermission umask = FsPermission.getUMask((Configuration)fs.getConf());
        if (!dirPerm.equals((Object)dirPerm.applyUMask(umask))) {
            fs.setPermission(path, new FsPermission(fsPerm));
        }
    }

    private boolean checkExists(FileSystem fs, Path path, FsPermission fsPerm) throws IOException {
        boolean exists = true;
        try {
            FileStatus appDirStatus = fs.getFileStatus(path);
            if (!APP_DIR_PERMISSIONS.equals((Object)appDirStatus.getPermission())) {
                fs.setPermission(path, APP_DIR_PERMISSIONS);
            }
        }
        catch (FileNotFoundException fnfe) {
            exists = false;
        }
        return exists;
    }

    protected void createAppDir(final String user, final ApplicationId appId, UserGroupInformation userUgi) {
        try {
            userUgi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<Object>(){

                @Override
                public Object run() throws Exception {
                    try {
                        FileSystem remoteFS = LogAggregationService.this.getFileSystem(LogAggregationService.this.getConfig());
                        Path appDir = LogAggregationUtils.getRemoteAppLogDir((Path)LogAggregationService.this.remoteRootLogDir, (ApplicationId)appId, (String)user, (String)LogAggregationService.this.remoteRootLogDirSuffix);
                        appDir = appDir.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory());
                        if (!LogAggregationService.this.checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) {
                            Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir((Path)LogAggregationService.this.remoteRootLogDir, (String)user, (String)LogAggregationService.this.remoteRootLogDirSuffix);
                            if (!LogAggregationService.this.checkExists(remoteFS, suffixDir = suffixDir.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory()), APP_DIR_PERMISSIONS)) {
                                Path userDir = LogAggregationUtils.getRemoteLogUserDir((Path)LogAggregationService.this.remoteRootLogDir, (String)user);
                                if (!LogAggregationService.this.checkExists(remoteFS, userDir = userDir.makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory()), APP_DIR_PERMISSIONS)) {
                                    LogAggregationService.this.createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
                                }
                                LogAggregationService.this.createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
                            }
                            LogAggregationService.this.createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
                        }
                    }
                    catch (IOException e) {
                        LOG.error((Object)("Failed to setup application log directory for " + appId), (Throwable)e);
                        throw e;
                    }
                    return null;
                }
            });
        }
        catch (Exception e) {
            throw new YarnRuntimeException((Throwable)e);
        }
    }

    private void initApp(ApplicationId appId, String user, Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy, Map<ApplicationAccessType, String> appAcls, LogAggregationContext logAggregationContext) {
        ApplicationEvent eventResponse;
        try {
            this.verifyAndCreateRemoteLogDir(this.getConfig());
            this.initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls, logAggregationContext);
            eventResponse = new ApplicationEvent(appId, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
        }
        catch (YarnRuntimeException e) {
            LOG.warn((Object)"Application failed to init aggregation", (Throwable)e);
            eventResponse = new ApplicationEvent(appId, ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED);
        }
        this.dispatcher.getEventHandler().handle((Event)eventResponse);
    }

    FileContext getLocalFileContext(Configuration conf) {
        try {
            return FileContext.getLocalFSFileContext((Configuration)conf);
        }
        catch (IOException e) {
            throw new YarnRuntimeException("Failed to access local fs");
        }
    }

    protected void initAppAggregator(final ApplicationId appId, String user, Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy, Map<ApplicationAccessType, String> appAcls, LogAggregationContext logAggregationContext) {
        AppLogAggregatorImpl appLogAggregator;
        final UserGroupInformation userUgi = UserGroupInformation.createRemoteUser((String)user);
        if (credentials != null) {
            userUgi.addCredentials(credentials);
        }
        if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator = new AppLogAggregatorImpl(this.dispatcher, this.deletionService, this.getConfig(), appId, userUgi, this.nodeId, this.dirsHandler, this.getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, appAcls, logAggregationContext, this.context, this.getLocalFileContext(this.getConfig()))) != null) {
            throw new YarnRuntimeException("Duplicate initApp for " + appId);
        }
        try {
            this.createAppDir(user, appId, userUgi);
        }
        catch (Exception e2) {
            YarnRuntimeException e2;
            this.appLogAggregators.remove(appId);
            this.closeFileSystems(userUgi);
            if (!(e2 instanceof YarnRuntimeException)) {
                e2 = new YarnRuntimeException((Throwable)e2);
            }
            throw (YarnRuntimeException)e2;
        }
        Runnable aggregatorWrapper = new Runnable(){

            @Override
            public void run() {
                try {
                    appLogAggregator.run();
                }
                finally {
                    LogAggregationService.this.appLogAggregators.remove(appId);
                    LogAggregationService.this.closeFileSystems(userUgi);
                }
            }
        };
        this.threadPool.execute(aggregatorWrapper);
    }

    protected void closeFileSystems(UserGroupInformation userUgi) {
        try {
            FileSystem.closeAllForUGI((UserGroupInformation)userUgi);
        }
        catch (IOException e) {
            LOG.warn((Object)"Failed to close filesystems: ", (Throwable)e);
        }
    }

    @InterfaceAudience.Private
    int getNumAggregators() {
        return this.appLogAggregators.size();
    }

    private void stopContainer(ContainerId containerId, int exitCode) {
        AppLogAggregator aggregator = (AppLogAggregator)this.appLogAggregators.get(containerId.getApplicationAttemptId().getApplicationId());
        if (aggregator == null) {
            LOG.warn((Object)("Log aggregation is not initialized for " + containerId + ", did it fail to start?"));
            return;
        }
        aggregator.startContainerLogAggregation(containerId, exitCode == 0);
    }

    private void stopApp(ApplicationId appId) {
        AppLogAggregator aggregator = (AppLogAggregator)this.appLogAggregators.get(appId);
        if (aggregator == null) {
            LOG.warn((Object)("Log aggregation is not initialized for " + appId + ", did it fail to start?"));
            return;
        }
        aggregator.finishLogAggregation();
    }

    @Override
    public void handle(LogHandlerEvent event) {
        switch ((LogHandlerEventType)event.getType()) {
            case APPLICATION_STARTED: {
                LogHandlerAppStartedEvent appStartEvent = (LogHandlerAppStartedEvent)event;
                this.initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(), appStartEvent.getCredentials(), appStartEvent.getLogRetentionPolicy(), appStartEvent.getApplicationAcls(), appStartEvent.getLogAggregationContext());
                break;
            }
            case CONTAINER_FINISHED: {
                LogHandlerContainerFinishedEvent containerFinishEvent = (LogHandlerContainerFinishedEvent)event;
                this.stopContainer(containerFinishEvent.getContainerId(), containerFinishEvent.getExitCode());
                break;
            }
            case APPLICATION_FINISHED: {
                LogHandlerAppFinishedEvent appFinishedEvent = (LogHandlerAppFinishedEvent)event;
                this.stopApp(appFinishedEvent.getApplicationId());
                break;
            }
        }
    }

    @VisibleForTesting
    public ConcurrentMap<ApplicationId, AppLogAggregator> getAppLogAggregators() {
        return this.appLogAggregators;
    }

    @VisibleForTesting
    public NodeId getNodeId() {
        return this.nodeId;
    }
}

