package org.apache.flink.kubernetes.highavailability;

import java.io.IOException;
import java.util.Optional;
import org.apache.flink.api.common.JobID;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/highavailability/KubernetesRunningJobsRegistry.class */
public class KubernetesRunningJobsRegistry implements RunningJobsRegistry {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KubernetesRunningJobsRegistry.class);
    private final FlinkKubeClient kubeClient;
    private final String configMapName;
    private final String lockIdentity;

    public KubernetesRunningJobsRegistry(FlinkKubeClient flinkKubeClient, String str, String str2) {
        this.kubeClient = (FlinkKubeClient) Preconditions.checkNotNull(flinkKubeClient);
        this.configMapName = (String) Preconditions.checkNotNull(str);
        this.lockIdentity = (String) Preconditions.checkNotNull(str2);
    }

    @Override // org.apache.flink.runtime.highavailability.RunningJobsRegistry
    public void setJobRunning(JobID jobID) throws IOException {
        Preconditions.checkNotNull(jobID);
        writeJobStatusToConfigMap(jobID, RunningJobsRegistry.JobSchedulingStatus.RUNNING);
    }

    @Override // org.apache.flink.runtime.highavailability.RunningJobsRegistry
    public void setJobFinished(JobID jobID) throws IOException {
        Preconditions.checkNotNull(jobID);
        writeJobStatusToConfigMap(jobID, RunningJobsRegistry.JobSchedulingStatus.DONE);
    }

    @Override // org.apache.flink.runtime.highavailability.RunningJobsRegistry
    public RunningJobsRegistry.JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException {
        Preconditions.checkNotNull(jobID);
        return (RunningJobsRegistry.JobSchedulingStatus) this.kubeClient.getConfigMap(this.configMapName).map(kubernetesConfigMap -> {
            return getJobStatus(kubernetesConfigMap, jobID).orElse(RunningJobsRegistry.JobSchedulingStatus.PENDING);
        }).orElseThrow(() -> {
            return new IOException("ConfigMap " + this.configMapName + " does not exist.");
        });
    }

    @Override // org.apache.flink.runtime.highavailability.RunningJobsRegistry
    public void clearJob(JobID jobID) throws IOException {
        Preconditions.checkNotNull(jobID);
        try {
            this.kubeClient.checkAndUpdateConfigMap(this.configMapName, kubernetesConfigMap -> {
                return (!KubernetesLeaderElector.hasLeadership(kubernetesConfigMap, this.lockIdentity) || kubernetesConfigMap.getData().remove(getKeyForJobId(jobID)) == null) ? Optional.empty() : Optional.of(kubernetesConfigMap);
            }).get();
        } catch (Exception e) {
            throw new IOException("Failed to clear job state in ConfigMap " + this.configMapName + " for job " + jobID, e);
        }
    }

    private void writeJobStatusToConfigMap(JobID jobID, RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus) throws IOException {
        LOG.debug("Setting scheduling state for job {} to {}.", jobID, jobSchedulingStatus);
        String keyForJobId = getKeyForJobId(jobID);
        try {
            this.kubeClient.checkAndUpdateConfigMap(this.configMapName, kubernetesConfigMap -> {
                if (KubernetesLeaderElector.hasLeadership(kubernetesConfigMap, this.lockIdentity)) {
                    Optional<RunningJobsRegistry.JobSchedulingStatus> jobStatus = getJobStatus(kubernetesConfigMap, jobID);
                    if (!jobStatus.isPresent() || jobStatus.get() != jobSchedulingStatus) {
                        kubernetesConfigMap.getData().put(keyForJobId, jobSchedulingStatus.name());
                        return Optional.of(kubernetesConfigMap);
                    }
                }
                return Optional.empty();
            }).get();
        } catch (Exception e) {
            throw new IOException("Failed to set " + jobSchedulingStatus.name() + " state in ConfigMap " + this.configMapName + " for job " + jobID, e);
        }
    }

    private Optional<RunningJobsRegistry.JobSchedulingStatus> getJobStatus(KubernetesConfigMap kubernetesConfigMap, JobID jobID) {
        String str = kubernetesConfigMap.getData().get(getKeyForJobId(jobID));
        return !StringUtils.isNullOrWhitespaceOnly(str) ? Optional.of(RunningJobsRegistry.JobSchedulingStatus.valueOf(str)) : Optional.empty();
    }

    private String getKeyForJobId(JobID jobID) {
        return Constants.RUNNING_JOBS_REGISTRY_KEY_PREFIX + jobID.toString();
    }
}
