package org.apache.flink.kubernetes.highavailability;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounter.class */
public class KubernetesCheckpointIDCounter implements CheckpointIDCounter {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KubernetesCheckpointIDCounter.class);
    private final FlinkKubeClient kubeClient;
    private final String configMapName;
    private final String lockIdentity;
    private boolean running = false;

    public KubernetesCheckpointIDCounter(FlinkKubeClient flinkKubeClient, String str, String str2) {
        this.kubeClient = (FlinkKubeClient) Preconditions.checkNotNull(flinkKubeClient);
        this.configMapName = (String) Preconditions.checkNotNull(str);
        this.lockIdentity = (String) Preconditions.checkNotNull(str2);
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointIDCounter
    public void start() {
        if (this.running) {
            return;
        }
        this.running = true;
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointIDCounter
    public void shutdown(JobStatus jobStatus) {
        if (this.running) {
            this.running = false;
            LOG.info("Shutting down.");
            if (jobStatus.isGloballyTerminalState()) {
                LOG.info("Removing counter from ConfigMap {}", this.configMapName);
                this.kubeClient.checkAndUpdateConfigMap(this.configMapName, kubernetesConfigMap -> {
                    if (!KubernetesLeaderElector.hasLeadership(kubernetesConfigMap, this.lockIdentity)) {
                        return Optional.empty();
                    }
                    kubernetesConfigMap.getData().remove(Constants.CHECKPOINT_COUNTER_KEY);
                    return Optional.of(kubernetesConfigMap);
                });
            }
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointIDCounter
    public long getAndIncrement() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        if (this.kubeClient.checkAndUpdateConfigMap(this.configMapName, kubernetesConfigMap -> {
            if (!KubernetesLeaderElector.hasLeadership(kubernetesConfigMap, this.lockIdentity)) {
                return Optional.empty();
            }
            long currentCounter = getCurrentCounter(kubernetesConfigMap);
            atomicLong.set(currentCounter);
            kubernetesConfigMap.getData().put(Constants.CHECKPOINT_COUNTER_KEY, String.valueOf(currentCounter + 1));
            return Optional.of(kubernetesConfigMap);
        }).get().booleanValue()) {
            return atomicLong.get();
        }
        throw new KubernetesException("Failed to update ConfigMap " + this.configMapName + " since current KubernetesCheckpointIDCounter does not have the leadership.");
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointIDCounter
    public long get() {
        return ((Long) this.kubeClient.getConfigMap(this.configMapName).map(this::getCurrentCounter).orElseThrow(() -> {
            return new FlinkRuntimeException(new KubernetesException("ConfigMap " + this.configMapName + " does not exist."));
        })).longValue();
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointIDCounter
    public void setCount(long j) throws Exception {
        this.kubeClient.checkAndUpdateConfigMap(this.configMapName, kubernetesConfigMap -> {
            if (KubernetesLeaderElector.hasLeadership(kubernetesConfigMap, this.lockIdentity)) {
                String str = kubernetesConfigMap.getData().get(Constants.CHECKPOINT_COUNTER_KEY);
                String valueOf = String.valueOf(j);
                if (str == null || !str.equals(valueOf)) {
                    kubernetesConfigMap.getData().put(Constants.CHECKPOINT_COUNTER_KEY, String.valueOf(j));
                    return Optional.of(kubernetesConfigMap);
                }
            }
            return Optional.empty();
        }).get();
    }

    private long getCurrentCounter(KubernetesConfigMap kubernetesConfigMap) {
        if (kubernetesConfigMap.getData().containsKey(Constants.CHECKPOINT_COUNTER_KEY)) {
            return Long.valueOf(kubernetesConfigMap.getData().get(Constants.CHECKPOINT_COUNTER_KEY)).longValue();
        }
        return 1L;
    }
}
