/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.v2.app.speculate;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.Clock;

public class DefaultSpeculator
extends AbstractService
implements Speculator {
    private static final long ON_SCHEDULE = Long.MIN_VALUE;
    private static final long ALREADY_SPECULATING = -9223372036854775807L;
    private static final long TOO_NEW = -9223372036854775806L;
    private static final long PROGRESS_IS_GOOD = -9223372036854775805L;
    private static final long NOT_RUNNING = -9223372036854775804L;
    private static final long TOO_LATE_TO_SPECULATE = -9223372036854775803L;
    private static final long SOONEST_RETRY_AFTER_NO_SPECULATE = 1000L;
    private static final long SOONEST_RETRY_AFTER_SPECULATE = 15000L;
    private static final double PROPORTION_RUNNING_TASKS_SPECULATABLE = 0.1;
    private static final double PROPORTION_TOTAL_TASKS_SPECULATABLE = 0.01;
    private static final int MINIMUM_ALLOWED_SPECULATIVE_TASKS = 10;
    private static final Log LOG = LogFactory.getLog(DefaultSpeculator.class);
    private final ConcurrentMap<TaskId, Boolean> runningTasks = new ConcurrentHashMap<TaskId, Boolean>();
    private final ConcurrentMap<TaskAttemptId, TaskAttemptHistoryStatistics> runningTaskAttemptStatistics = new ConcurrentHashMap<TaskAttemptId, TaskAttemptHistoryStatistics>();
    private static final long MAX_WAITTING_TIME_FOR_HEARTBEAT = 9000L;
    private final ConcurrentMap<JobId, AtomicInteger> mapContainerNeeds = new ConcurrentHashMap<JobId, AtomicInteger>();
    private final ConcurrentMap<JobId, AtomicInteger> reduceContainerNeeds = new ConcurrentHashMap<JobId, AtomicInteger>();
    private final Set<TaskId> mayHaveSpeculated = new HashSet<TaskId>();
    private final Configuration conf;
    private AppContext context;
    private Thread speculationBackgroundThread = null;
    private volatile boolean stopped = false;
    private BlockingQueue<SpeculatorEvent> eventQueue = new LinkedBlockingQueue<SpeculatorEvent>();
    private TaskRuntimeEstimator estimator;
    private BlockingQueue<Object> scanControl = new LinkedBlockingQueue<Object>();
    private final Clock clock;
    private final EventHandler<TaskEvent> eventHandler;

    public DefaultSpeculator(Configuration conf, AppContext context) {
        this(conf, context, context.getClock());
    }

    public DefaultSpeculator(Configuration conf, AppContext context, Clock clock) {
        this(conf, context, DefaultSpeculator.getEstimator(conf, context), clock);
    }

    private static TaskRuntimeEstimator getEstimator(Configuration conf, AppContext context) {
        TaskRuntimeEstimator estimator;
        try {
            Class estimatorClass = conf.getClass("yarn.app.mapreduce.am.job.task.estimator.class", LegacyTaskRuntimeEstimator.class, TaskRuntimeEstimator.class);
            Constructor estimatorConstructor = estimatorClass.getConstructor(new Class[0]);
            estimator = (TaskRuntimeEstimator)estimatorConstructor.newInstance(new Object[0]);
            estimator.contextualize(conf, context);
        }
        catch (InstantiationException ex) {
            LOG.error((Object)"Can't make a speculation runtime extimator", (Throwable)ex);
            throw new YarnRuntimeException((Throwable)ex);
        }
        catch (IllegalAccessException ex) {
            LOG.error((Object)"Can't make a speculation runtime extimator", (Throwable)ex);
            throw new YarnRuntimeException((Throwable)ex);
        }
        catch (InvocationTargetException ex) {
            LOG.error((Object)"Can't make a speculation runtime extimator", (Throwable)ex);
            throw new YarnRuntimeException((Throwable)ex);
        }
        catch (NoSuchMethodException ex) {
            LOG.error((Object)"Can't make a speculation runtime extimator", (Throwable)ex);
            throw new YarnRuntimeException((Throwable)ex);
        }
        return estimator;
    }

    public DefaultSpeculator(Configuration conf, AppContext context, TaskRuntimeEstimator estimator, Clock clock) {
        super(DefaultSpeculator.class.getName());
        this.conf = conf;
        this.context = context;
        this.estimator = estimator;
        this.clock = clock;
        this.eventHandler = context.getEventHandler();
    }

    protected void serviceStart() throws Exception {
        Runnable speculationBackgroundCore = new Runnable(){

            @Override
            public void run() {
                while (!DefaultSpeculator.this.stopped && !Thread.currentThread().isInterrupted()) {
                    long backgroundRunStartTime = DefaultSpeculator.this.clock.getTime();
                    try {
                        int speculations = DefaultSpeculator.this.computeSpeculations();
                        long mininumRecomp = speculations > 0 ? 15000L : 1000L;
                        long wait = Math.max(mininumRecomp, DefaultSpeculator.this.clock.getTime() - backgroundRunStartTime);
                        if (speculations > 0) {
                            LOG.info((Object)("We launched " + speculations + " speculations.  Sleeping " + wait + " milliseconds."));
                        }
                        Object pollResult = DefaultSpeculator.this.scanControl.poll(wait, TimeUnit.MILLISECONDS);
                    }
                    catch (InterruptedException e) {
                        if (!DefaultSpeculator.this.stopped) {
                            LOG.error((Object)"Background thread returning, interrupted", (Throwable)e);
                        }
                        return;
                    }
                }
            }
        };
        this.speculationBackgroundThread = new Thread(speculationBackgroundCore, "DefaultSpeculator background processing");
        this.speculationBackgroundThread.start();
        super.serviceStart();
    }

    protected void serviceStop() throws Exception {
        this.stopped = true;
        if (this.speculationBackgroundThread != null) {
            this.speculationBackgroundThread.interrupt();
        }
        super.serviceStop();
    }

    @Override
    public void handleAttempt(TaskAttemptStatusUpdateEvent.TaskAttemptStatus status) {
        long timestamp = this.clock.getTime();
        this.statusUpdate(status, timestamp);
    }

    public boolean eventQueueEmpty() {
        return this.eventQueue.isEmpty();
    }

    public void scanForSpeculations() {
        LOG.info((Object)"We got asked to run a debug speculation scan.");
        System.out.println("We got asked to run a debug speculation scan.");
        System.out.println("There are " + this.scanControl.size() + " events stacked already.");
        this.scanControl.add(new Object());
        Thread.yield();
    }

    private AtomicInteger containerNeed(TaskId taskID) {
        JobId jobID = taskID.getJobId();
        TaskType taskType = taskID.getTaskType();
        ConcurrentMap<JobId, AtomicInteger> relevantMap = taskType == TaskType.MAP ? this.mapContainerNeeds : this.reduceContainerNeeds;
        AtomicInteger result = (AtomicInteger)relevantMap.get(jobID);
        if (result == null) {
            relevantMap.putIfAbsent(jobID, new AtomicInteger(0));
            result = (AtomicInteger)relevantMap.get(jobID);
        }
        return result;
    }

    private synchronized void processSpeculatorEvent(SpeculatorEvent event) {
        switch ((Speculator.EventType)event.getType()) {
            case ATTEMPT_STATUS_UPDATE: {
                this.statusUpdate(event.getReportedStatus(), event.getTimestamp());
                break;
            }
            case TASK_CONTAINER_NEED_UPDATE: {
                AtomicInteger need = this.containerNeed(event.getTaskID());
                need.addAndGet(event.containersNeededChange());
                break;
            }
            case ATTEMPT_START: {
                LOG.info((Object)("ATTEMPT_START " + event.getTaskID()));
                this.estimator.enrollAttempt(event.getReportedStatus(), event.getTimestamp());
                break;
            }
            case JOB_CREATE: {
                LOG.info((Object)("JOB_CREATE " + event.getJobID()));
                this.estimator.contextualize(this.getConfig(), this.context);
            }
        }
    }

    protected void statusUpdate(TaskAttemptStatusUpdateEvent.TaskAttemptStatus reportedStatus, long timestamp) {
        String stateString = reportedStatus.taskState.toString();
        TaskAttemptId attemptID = reportedStatus.id;
        TaskId taskID = attemptID.getTaskId();
        Job job = this.context.getJob(taskID.getJobId());
        if (job == null) {
            return;
        }
        Task task = job.getTask(taskID);
        if (task == null) {
            return;
        }
        this.estimator.updateAttempt(reportedStatus, timestamp);
        if (stateString.equals(TaskAttemptState.RUNNING.name())) {
            this.runningTasks.putIfAbsent(taskID, Boolean.TRUE);
        } else {
            this.runningTasks.remove(taskID, Boolean.TRUE);
            if (!stateString.equals(TaskAttemptState.STARTING.name())) {
                this.runningTaskAttemptStatistics.remove(attemptID);
            }
        }
    }

    private long speculationValue(TaskId taskID, long now) {
        Job job = this.context.getJob(taskID.getJobId());
        Task task = job.getTask(taskID);
        Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
        long acceptableRuntime = Long.MIN_VALUE;
        long result = Long.MIN_VALUE;
        if (!this.mayHaveSpeculated.contains(taskID) && (acceptableRuntime = this.estimator.thresholdRuntime(taskID)) == Long.MAX_VALUE) {
            return Long.MIN_VALUE;
        }
        TaskAttemptId runningTaskAttemptID = null;
        int numberRunningAttempts = 0;
        for (TaskAttempt taskAttempt : attempts.values()) {
            if (taskAttempt.getState() != TaskAttemptState.RUNNING && taskAttempt.getState() != TaskAttemptState.STARTING) continue;
            if (++numberRunningAttempts > 1) {
                return -9223372036854775807L;
            }
            runningTaskAttemptID = taskAttempt.getID();
            long estimatedRunTime = this.estimator.estimatedRuntime(runningTaskAttemptID);
            long taskAttemptStartTime = this.estimator.attemptEnrolledTime(runningTaskAttemptID);
            if (taskAttemptStartTime > now) {
                return -9223372036854775806L;
            }
            long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;
            long estimatedReplacementEndTime = now + this.estimator.estimatedNewAttemptRuntime(taskID);
            float progress = taskAttempt.getProgress();
            TaskAttemptHistoryStatistics data = (TaskAttemptHistoryStatistics)this.runningTaskAttemptStatistics.get(runningTaskAttemptID);
            if (data == null) {
                this.runningTaskAttemptStatistics.put(runningTaskAttemptID, new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now));
            } else if (estimatedRunTime == data.getEstimatedRunTime() && progress == data.getProgress()) {
                if (data.notHeartbeatedInAWhile(now)) {
                    TaskAttemptStatusUpdateEvent.TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatusUpdateEvent.TaskAttemptStatus();
                    taskAttemptStatus.id = runningTaskAttemptID;
                    taskAttemptStatus.progress = progress;
                    taskAttemptStatus.taskState = taskAttempt.getState();
                    this.handleAttempt(taskAttemptStatus);
                }
            } else {
                data.setEstimatedRunTime(estimatedRunTime);
                data.setProgress(progress);
                data.resetHeartBeatTime(now);
            }
            if (estimatedEndTime < now) {
                return -9223372036854775805L;
            }
            if (estimatedReplacementEndTime >= estimatedEndTime) {
                return -9223372036854775803L;
            }
            result = estimatedEndTime - estimatedReplacementEndTime;
        }
        if (numberRunningAttempts == 0) {
            return -9223372036854775804L;
        }
        if (acceptableRuntime == Long.MIN_VALUE && (acceptableRuntime = this.estimator.thresholdRuntime(taskID)) == Long.MAX_VALUE) {
            return Long.MIN_VALUE;
        }
        return result;
    }

    protected void addSpeculativeAttempt(TaskId taskID) {
        LOG.info((Object)("DefaultSpeculator.addSpeculativeAttempt -- we are speculating " + taskID));
        this.eventHandler.handle((Event)new TaskEvent(taskID, TaskEventType.T_ADD_SPEC_ATTEMPT));
        this.mayHaveSpeculated.add(taskID);
    }

    public void handle(SpeculatorEvent event) {
        this.processSpeculatorEvent(event);
    }

    private int maybeScheduleAMapSpeculation() {
        return this.maybeScheduleASpeculation(TaskType.MAP);
    }

    private int maybeScheduleAReduceSpeculation() {
        return this.maybeScheduleASpeculation(TaskType.REDUCE);
    }

    private int maybeScheduleASpeculation(TaskType type) {
        int successes = 0;
        long now = this.clock.getTime();
        ConcurrentMap<JobId, AtomicInteger> containerNeeds = type == TaskType.MAP ? this.mapContainerNeeds : this.reduceContainerNeeds;
        for (Map.Entry jobEntry : containerNeeds.entrySet()) {
            if (((AtomicInteger)jobEntry.getValue()).get() > 0) continue;
            int numberSpeculationsAlready = 0;
            int numberRunningTasks = 0;
            Job job = this.context.getJob((JobId)jobEntry.getKey());
            Map<TaskId, Task> tasks = job.getTasks(type);
            int numberAllowedSpeculativeTasks = (int)Math.max(10.0, 0.01 * (double)tasks.size());
            TaskId bestTaskID = null;
            long bestSpeculationValue = -1L;
            for (Map.Entry<TaskId, Task> taskEntry : tasks.entrySet()) {
                long mySpeculationValue = this.speculationValue(taskEntry.getKey(), now);
                if (mySpeculationValue == -9223372036854775807L) {
                    ++numberSpeculationsAlready;
                }
                if (mySpeculationValue != -9223372036854775804L) {
                    ++numberRunningTasks;
                }
                if (mySpeculationValue <= bestSpeculationValue) continue;
                bestTaskID = taskEntry.getKey();
                bestSpeculationValue = mySpeculationValue;
            }
            numberAllowedSpeculativeTasks = (int)Math.max((double)numberAllowedSpeculativeTasks, 0.1 * (double)numberRunningTasks);
            if (bestTaskID == null || numberAllowedSpeculativeTasks <= numberSpeculationsAlready) continue;
            this.addSpeculativeAttempt(bestTaskID);
            ++successes;
        }
        return successes;
    }

    private int computeSpeculations() {
        return this.maybeScheduleAMapSpeculation() + this.maybeScheduleAReduceSpeculation();
    }

    static class TaskAttemptHistoryStatistics {
        private long estimatedRunTime;
        private float progress;
        private long lastHeartBeatTime;

        public TaskAttemptHistoryStatistics(long estimatedRunTime, float progress, long nonProgressStartTime) {
            this.estimatedRunTime = estimatedRunTime;
            this.progress = progress;
            this.resetHeartBeatTime(nonProgressStartTime);
        }

        public long getEstimatedRunTime() {
            return this.estimatedRunTime;
        }

        public float getProgress() {
            return this.progress;
        }

        public void setEstimatedRunTime(long estimatedRunTime) {
            this.estimatedRunTime = estimatedRunTime;
        }

        public void setProgress(float progress) {
            this.progress = progress;
        }

        public boolean notHeartbeatedInAWhile(long now) {
            if (now - this.lastHeartBeatTime <= 9000L) {
                return false;
            }
            this.resetHeartBeatTime(now);
            return true;
        }

        public void resetHeartBeatTime(long lastHeartBeatTime) {
            this.lastHeartBeatTime = lastHeartBeatTime;
        }
    }
}

