package org.apache.flink.runtime.jobmaster;

import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceProcessFactory;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.class */
public class JobMasterServiceLeadershipRunner implements JobManagerRunner, LeaderContender {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) JobMasterServiceLeadershipRunner.class);
    private final JobMasterServiceProcessFactory jobMasterServiceProcessFactory;
    private final LeaderElectionService leaderElectionService;
    private final RunningJobsRegistry runningJobsRegistry;
    private final LibraryCacheManager.ClassLoaderLease classLoaderLease;
    private final FatalErrorHandler fatalErrorHandler;
    private final Object lock = new Object();
    private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
    private final CompletableFuture<JobManagerRunnerResult> resultFuture = new CompletableFuture<>();

    @GuardedBy("lock")
    private State state = State.RUNNING;

    @GuardedBy("lock")
    private CompletableFuture<Void> sequentialOperation = FutureUtils.completedVoidFuture();

    @GuardedBy("lock")
    private JobMasterServiceProcess jobMasterServiceProcess = JobMasterServiceProcess.waitingForLeadership();

    @GuardedBy("lock")
    private CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = new CompletableFuture<>();

    @GuardedBy("lock")
    private boolean hasCurrentLeaderBeenCancelled = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner$State.class */
    public enum State {
        RUNNING,
        STOPPED,
        JOB_COMPLETED
    }

    public JobMasterServiceLeadershipRunner(JobMasterServiceProcessFactory jobMasterServiceProcessFactory, LeaderElectionService leaderElectionService, RunningJobsRegistry runningJobsRegistry, LibraryCacheManager.ClassLoaderLease classLoaderLease, FatalErrorHandler fatalErrorHandler) {
        this.jobMasterServiceProcessFactory = jobMasterServiceProcessFactory;
        this.leaderElectionService = leaderElectionService;
        this.runningJobsRegistry = runningJobsRegistry;
        this.classLoaderLease = classLoaderLease;
        this.fatalErrorHandler = fatalErrorHandler;
    }

    @Override // org.apache.flink.util.AutoCloseableAsync
    public CompletableFuture<Void> closeAsync() {
        synchronized (this.lock) {
            if (this.state != State.STOPPED) {
                this.state = State.STOPPED;
                LOG.debug("Terminating the leadership runner for job {}.", getJobID());
                this.jobMasterGatewayFuture.completeExceptionally(new FlinkException("JobMasterServiceLeadershipRunner is closed. Therefore, the corresponding JobMaster will never acquire the leadership."));
                this.resultFuture.complete(JobManagerRunnerResult.forSuccess(createExecutionGraphInfoWithJobStatus(JobStatus.SUSPENDED)));
                FutureUtils.forward(FutureUtils.runAfterwards(this.jobMasterServiceProcess.closeAsync(), () -> {
                    this.classLoaderLease.release();
                    this.leaderElectionService.stop();
                }), this.terminationFuture);
                this.terminationFuture.whenComplete((r5, th) -> {
                    LOG.debug("Leadership runner for job {} has been terminated.", getJobID());
                });
            }
        }
        return this.terminationFuture;
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerRunner
    public void start() throws Exception {
        LOG.debug("Start leadership runner for job {}.", getJobID());
        this.leaderElectionService.start(this);
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerRunner
    public CompletableFuture<JobMasterGateway> getJobMasterGateway() {
        CompletableFuture<JobMasterGateway> completableFuture;
        synchronized (this.lock) {
            completableFuture = this.jobMasterGatewayFuture;
        }
        return completableFuture;
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerRunner
    public CompletableFuture<JobManagerRunnerResult> getResultFuture() {
        return this.resultFuture;
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerRunner
    public JobID getJobID() {
        return this.jobMasterServiceProcessFactory.getJobId();
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerRunner
    public CompletableFuture<Acknowledge> cancel(Time time) {
        CompletableFuture<Acknowledge> exceptionally;
        synchronized (this.lock) {
            this.hasCurrentLeaderBeenCancelled = true;
            exceptionally = getJobMasterGateway().thenCompose(jobMasterGateway -> {
                return jobMasterGateway.cancel(time);
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                throw new CompletionException(new FlinkException("Cancellation failed.", ExceptionUtils.stripCompletionException(th)));
            });
        }
        return exceptionally;
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerRunner
    public CompletableFuture<JobStatus> requestJobStatus(Time time) {
        return requestJob(time).thenApply(executionGraphInfo -> {
            return executionGraphInfo.getArchivedExecutionGraph().getState();
        });
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerRunner
    public CompletableFuture<JobDetails> requestJobDetails(Time time) {
        return requestJob(time).thenApply(executionGraphInfo -> {
            return JobDetails.createDetailsForJob(executionGraphInfo.getArchivedExecutionGraph());
        });
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerRunner
    public CompletableFuture<ExecutionGraphInfo> requestJob(Time time) {
        synchronized (this.lock) {
            if (this.state != State.RUNNING) {
                return this.resultFuture.thenApply((v0) -> {
                    return v0.getExecutionGraphInfo();
                });
            }
            if (this.jobMasterServiceProcess.isInitializedAndRunning()) {
                return getJobMasterGateway().thenCompose(jobMasterGateway -> {
                    return jobMasterGateway.requestJob(time);
                });
            }
            return CompletableFuture.completedFuture(createExecutionGraphInfoWithJobStatus(this.hasCurrentLeaderBeenCancelled ? JobStatus.CANCELLING : JobStatus.INITIALIZING));
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.JobManagerRunner
    public boolean isInitialized() {
        boolean isInitializedAndRunning;
        synchronized (this.lock) {
            isInitializedAndRunning = this.jobMasterServiceProcess.isInitializedAndRunning();
        }
        return isInitializedAndRunning;
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderContender
    public void grantLeadership(UUID uuid) {
        runIfStateRunning(() -> {
            startJobMasterServiceProcessAsync(uuid);
        }, "starting a new JobMasterServiceProcess");
    }

    @GuardedBy("lock")
    private void startJobMasterServiceProcessAsync(UUID uuid) {
        this.sequentialOperation = this.sequentialOperation.thenRun(() -> {
            runIfValidLeader(uuid, ThrowingRunnable.unchecked(() -> {
                verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(uuid);
            }), "verify job scheduling status and create JobMasterServiceProcess");
        });
        handleAsyncOperationError(this.sequentialOperation, "Could not start the job manager.");
    }

    @GuardedBy("lock")
    private void verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(UUID uuid) throws FlinkException {
        if (getJobSchedulingStatus() == RunningJobsRegistry.JobSchedulingStatus.DONE) {
            jobAlreadyDone();
        } else {
            createNewJobMasterServiceProcess(uuid);
        }
    }

    private ExecutionGraphInfo createExecutionGraphInfoWithJobStatus(JobStatus jobStatus) {
        return new ExecutionGraphInfo(this.jobMasterServiceProcessFactory.createArchivedExecutionGraph(jobStatus, null));
    }

    private void jobAlreadyDone() {
        this.resultFuture.complete(JobManagerRunnerResult.forSuccess(new ExecutionGraphInfo(this.jobMasterServiceProcessFactory.createArchivedExecutionGraph(JobStatus.FAILED, new JobAlreadyDoneException(getJobID())))));
    }

    private RunningJobsRegistry.JobSchedulingStatus getJobSchedulingStatus() throws FlinkException {
        try {
            return this.runningJobsRegistry.getJobSchedulingStatus(getJobID());
        } catch (IOException e) {
            throw new FlinkException(String.format("Could not retrieve the job scheduling status for job %s.", getJobID()), e);
        }
    }

    @GuardedBy("lock")
    private void createNewJobMasterServiceProcess(UUID uuid) throws FlinkException {
        Preconditions.checkState(this.jobMasterServiceProcess.closeAsync().isDone());
        LOG.debug("Create new JobMasterServiceProcess because we were granted leadership under {}.", uuid);
        try {
            this.runningJobsRegistry.setJobRunning(getJobID());
            this.jobMasterServiceProcess = this.jobMasterServiceProcessFactory.create(uuid);
            forwardIfValidLeader(uuid, this.jobMasterServiceProcess.getJobMasterGatewayFuture(), this.jobMasterGatewayFuture, "JobMasterGatewayFuture from JobMasterServiceProcess");
            forwardResultFuture(uuid, this.jobMasterServiceProcess.getResultFuture());
            confirmLeadership(uuid, this.jobMasterServiceProcess.getLeaderAddressFuture());
        } catch (IOException e) {
            throw new FlinkException(String.format("Failed to set the job %s to running in the running jobs registry.", getJobID()), e);
        }
    }

    private void confirmLeadership(UUID uuid, CompletableFuture<String> completableFuture) {
        FutureUtils.assertNoException(completableFuture.thenAccept(str -> {
            synchronized (this.lock) {
                if (isValidLeader(uuid)) {
                    LOG.debug("Confirm leadership {}.", uuid);
                    this.leaderElectionService.confirmLeadership(uuid, str);
                } else {
                    LOG.trace("Ignore confirming leadership because the leader {} is no longer valid.", uuid);
                }
            }
        }));
    }

    private void forwardResultFuture(UUID uuid, CompletableFuture<JobManagerRunnerResult> completableFuture) {
        completableFuture.whenComplete((jobManagerRunnerResult, th) -> {
            synchronized (this.lock) {
                if (isValidLeader(uuid)) {
                    onJobCompletion(jobManagerRunnerResult, th);
                } else {
                    LOG.trace("Ignore result future forwarding because the leader {} is no longer valid.", uuid);
                }
            }
        });
    }

    @GuardedBy("lock")
    private void onJobCompletion(JobManagerRunnerResult jobManagerRunnerResult, Throwable th) {
        this.state = State.JOB_COMPLETED;
        LOG.debug("Completing the result for job {}.", getJobID());
        if (th != null) {
            this.resultFuture.completeExceptionally(th);
            this.jobMasterGatewayFuture.completeExceptionally(new FlinkException("Could not retrieve JobMasterGateway because the JobMaster failed.", th));
            return;
        }
        if (jobManagerRunnerResult.isSuccess()) {
            try {
                this.runningJobsRegistry.setJobFinished(getJobID());
            } catch (IOException e) {
                LOG.error("Could not un-register from high-availability services job {}.Other JobManager's may attempt to recover it and re-execute it.", getJobID(), e);
            }
        } else {
            this.jobMasterGatewayFuture.completeExceptionally(new FlinkException("Could not retrieve JobMasterGateway because the JobMaster initialization failed.", jobManagerRunnerResult.getInitializationFailure()));
        }
        this.resultFuture.complete(jobManagerRunnerResult);
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderContender
    public void revokeLeadership() {
        runIfStateRunning(this::stopJobMasterServiceProcessAsync, "revoke leadership from JobMasterServiceProcess");
    }

    @GuardedBy("lock")
    private void stopJobMasterServiceProcessAsync() {
        this.sequentialOperation = this.sequentialOperation.thenCompose(r5 -> {
            return (CompletableFuture) callIfRunning(this::stopJobMasterServiceProcess, "stop leading JobMasterServiceProcess").orElse(FutureUtils.completedVoidFuture());
        });
        handleAsyncOperationError(this.sequentialOperation, "Could not suspend the job manager.");
    }

    @GuardedBy("lock")
    private CompletableFuture<Void> stopJobMasterServiceProcess() {
        LOG.debug("Stop current JobMasterServiceProcess because the leadership has been revoked.");
        this.jobMasterGatewayFuture.completeExceptionally(new FlinkException("Cannot obtain JobMasterGateway because the JobMaster lost leadership."));
        this.jobMasterGatewayFuture = new CompletableFuture<>();
        this.hasCurrentLeaderBeenCancelled = false;
        return this.jobMasterServiceProcess.closeAsync();
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderContender
    public void handleError(Exception exc) {
        this.fatalErrorHandler.onFatalError(exc);
    }

    private void handleAsyncOperationError(CompletableFuture<Void> completableFuture, String str) {
        completableFuture.whenComplete((r7, th) -> {
            if (th != null) {
                runIfStateRunning(() -> {
                    handleJobMasterServiceLeadershipRunnerError(new FlinkException(str, th));
                }, "handle JobMasterServiceLeadershipRunner error");
            }
        });
    }

    private void handleJobMasterServiceLeadershipRunnerError(Throwable th) {
        if (ExceptionUtils.isJvmFatalError(th)) {
            this.fatalErrorHandler.onFatalError(th);
        } else {
            this.resultFuture.completeExceptionally(th);
        }
    }

    private void runIfStateRunning(Runnable runnable, String str) {
        synchronized (this.lock) {
            if (isRunning()) {
                runnable.run();
            } else {
                LOG.trace("Ignore '{}' because the leadership runner is no longer running.", str);
            }
        }
    }

    private <T> Optional<T> callIfRunning(Supplier<? extends T> supplier, String str) {
        synchronized (this.lock) {
            if (isRunning()) {
                return Optional.of(supplier.get());
            }
            LOG.trace("Ignore '{}' because the leadership runner is no longer running.", str);
            return Optional.empty();
        }
    }

    @GuardedBy("lock")
    private boolean isRunning() {
        return this.state == State.RUNNING;
    }

    private void runIfValidLeader(UUID uuid, Runnable runnable, String str) {
        synchronized (this.lock) {
            if (isValidLeader(uuid)) {
                runnable.run();
            } else {
                LOG.trace("Ignore leader action '{}' because the leadership runner is no longer the valid leader for {}.", str, uuid);
            }
        }
    }

    @GuardedBy("lock")
    private boolean isValidLeader(UUID uuid) {
        return isRunning() && this.leaderElectionService.hasLeadership(uuid);
    }

    private <T> void forwardIfValidLeader(UUID uuid, CompletableFuture<? extends T> completableFuture, CompletableFuture<T> completableFuture2, String str) {
        completableFuture.whenComplete((obj, th) -> {
            synchronized (this.lock) {
                if (!isValidLeader(uuid)) {
                    LOG.trace("Ignore forwarding '{}' because the leadership runner is no longer the valid leader for {}.", str, uuid);
                } else if (th != null) {
                    completableFuture2.completeExceptionally(th);
                } else {
                    completableFuture2.complete(obj);
                }
            }
        });
    }
}
