/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode.ha;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.server.namenode.CheckpointConf;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceCancelledException;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;

@InterfaceAudience.Private
public class StandbyCheckpointer {
    private static final Log LOG = LogFactory.getLog(StandbyCheckpointer.class);
    private static final long PREVENT_AFTER_CANCEL_MS = 120000L;
    private final CheckpointConf checkpointConf;
    private final FSNamesystem namesystem;
    private long lastCheckpointTime;
    private final CheckpointerThread thread;
    private String activeNNAddress;
    private InetSocketAddress myNNAddress;
    private Object cancelLock = new Object();
    private Canceler canceler;
    private static int canceledCount = 0;

    public StandbyCheckpointer(Configuration conf, FSNamesystem ns) throws IOException {
        this.namesystem = ns;
        this.checkpointConf = new CheckpointConf(conf);
        this.thread = new CheckpointerThread();
        this.setNameNodeAddresses(conf);
    }

    private void setNameNodeAddresses(Configuration conf) throws IOException {
        String myAddrString = this.getHttpAddress(conf);
        Configuration confForActive = HAUtil.getConfForOtherNode(conf);
        this.activeNNAddress = this.getHttpAddress(confForActive);
        Preconditions.checkArgument(this.checkAddress(this.activeNNAddress), "Bad address for active NN: %s", this.activeNNAddress);
        Preconditions.checkArgument(this.checkAddress(myAddrString), "Bad address for standby NN: %s", myAddrString);
        this.myNNAddress = NetUtils.createSocketAddr(myAddrString);
    }

    private String getHttpAddress(Configuration conf) throws IOException {
        String configuredAddr = DFSUtil.getInfoServer(null, conf, false);
        String hostnameFromRpc = NameNode.getServiceAddress(conf, true).getHostName();
        try {
            return DFSUtil.substituteForWildcardAddress(configuredAddr, hostnameFromRpc);
        }
        catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

    private boolean checkAddress(String addrStr) {
        InetSocketAddress addr = NetUtils.createSocketAddr(addrStr);
        return addr.getPort() != 0;
    }

    public void start() {
        LOG.info("Starting standby checkpoint thread...\nCheckpointing active NN at " + this.activeNNAddress + "\n" + "Serving checkpoints at " + this.myNNAddress);
        this.thread.start();
    }

    public void stop() throws IOException {
        this.cancelAndPreventCheckpoints("Stopping checkpointer");
        this.thread.setShouldRun(false);
        this.thread.interrupt();
        try {
            this.thread.join();
        }
        catch (InterruptedException e) {
            LOG.warn("Edit log tailer thread exited with an exception");
            throw new IOException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doCheckpoint() throws InterruptedException, IOException {
        long txid;
        assert (this.canceler != null);
        this.namesystem.writeLockInterruptibly();
        try {
            assert (this.namesystem.getEditLog().isOpenForRead()) : "Standby Checkpointer should only attempt a checkpoint when NN is in standby mode, but the edit logs are in an unexpected state";
            FSImage img = this.namesystem.getFSImage();
            long prevCheckpointTxId = img.getStorage().getMostRecentCheckpointTxId();
            long thisCheckpointTxId = img.getLastAppliedOrWrittenTxId();
            assert (thisCheckpointTxId >= prevCheckpointTxId);
            if (thisCheckpointTxId == prevCheckpointTxId) {
                LOG.info("A checkpoint was triggered but the Standby Node has not received any transactions since the last checkpoint at txid " + thisCheckpointTxId + ". Skipping...");
                return;
            }
            img.saveNamespace(this.namesystem, this.canceler);
            txid = img.getStorage().getMostRecentCheckpointTxId();
            assert (txid == thisCheckpointTxId) : "expected to save checkpoint at txid=" + thisCheckpointTxId + " but instead saved at txid=" + txid;
        }
        finally {
            this.namesystem.writeUnlock();
        }
        TransferFsImage.uploadImageFromStorage(this.activeNNAddress, this.myNNAddress, this.namesystem.getFSImage().getStorage(), txid);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancelAndPreventCheckpoints(String msg) throws ServiceFailedException {
        this.thread.preventCheckpointsFor(120000L);
        Object object = this.cancelLock;
        synchronized (object) {
            if (this.canceler != null) {
                this.canceler.cancel(msg);
            }
        }
    }

    @VisibleForTesting
    static int getCanceledCount() {
        return canceledCount;
    }

    private long countUncheckpointedTxns() {
        FSImage img = this.namesystem.getFSImage();
        return img.getLastAppliedOrWrittenTxId() - img.getStorage().getMostRecentCheckpointTxId();
    }

    @VisibleForTesting
    String getActiveNNAddress() {
        return this.activeNNAddress;
    }

    private class CheckpointerThread
    extends Thread {
        private volatile boolean shouldRun;
        private volatile long preventCheckpointsUntil;

        private CheckpointerThread() {
            super("Standby State Checkpointer");
            this.shouldRun = true;
            this.preventCheckpointsUntil = 0L;
        }

        private void setShouldRun(boolean shouldRun) {
            this.shouldRun = shouldRun;
        }

        @Override
        public void run() {
            SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction<Object>(){

                @Override
                public Object run() {
                    CheckpointerThread.this.doWork();
                    return null;
                }
            });
        }

        private void preventCheckpointsFor(long delayMs) {
            this.preventCheckpointsUntil = Time.now() + delayMs;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void doWork() {
            StandbyCheckpointer.this.lastCheckpointTime = Time.now();
            while (this.shouldRun) {
                try {
                    Thread.sleep(1000L * StandbyCheckpointer.this.checkpointConf.getCheckPeriod());
                }
                catch (InterruptedException ie) {
                    // empty catch block
                }
                if (!this.shouldRun) break;
                try {
                    if (UserGroupInformation.isSecurityEnabled()) {
                        UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
                    }
                    long now = Time.now();
                    long uncheckpointed = StandbyCheckpointer.this.countUncheckpointedTxns();
                    long secsSinceLast = (now - StandbyCheckpointer.this.lastCheckpointTime) / 1000L;
                    boolean needCheckpoint = false;
                    if (uncheckpointed >= StandbyCheckpointer.this.checkpointConf.getTxnCount()) {
                        LOG.info("Triggering checkpoint because there have been " + uncheckpointed + " txns since the last checkpoint, which " + "exceeds the configured threshold " + StandbyCheckpointer.this.checkpointConf.getTxnCount());
                        needCheckpoint = true;
                    } else if (secsSinceLast >= StandbyCheckpointer.this.checkpointConf.getPeriod()) {
                        LOG.info("Triggering checkpoint because it has been " + secsSinceLast + " seconds since the last checkpoint, which " + "exceeds the configured interval " + StandbyCheckpointer.this.checkpointConf.getPeriod());
                        needCheckpoint = true;
                    }
                    Object object = StandbyCheckpointer.this.cancelLock;
                    synchronized (object) {
                        block40: {
                            if (now >= this.preventCheckpointsUntil) break block40;
                            LOG.info("But skipping this checkpoint since we are about to failover!");
                            canceledCount++;
                            continue;
                        }
                        assert (StandbyCheckpointer.this.canceler == null);
                        StandbyCheckpointer.this.canceler = new Canceler();
                    }
                    if (!needCheckpoint) continue;
                    StandbyCheckpointer.this.doCheckpoint();
                    StandbyCheckpointer.this.lastCheckpointTime = now;
                }
                catch (SaveNamespaceCancelledException ce) {
                    LOG.info("Checkpoint was cancelled: " + ce.getMessage());
                    canceledCount++;
                }
                catch (InterruptedException ie) {
                    Object object = StandbyCheckpointer.this.cancelLock;
                    synchronized (object) {
                        StandbyCheckpointer.this.canceler = null;
                    }
                }
                catch (Throwable t) {
                    LOG.error("Exception in doCheckpoint", t);
                }
                finally {
                    Object ce = StandbyCheckpointer.this.cancelLock;
                    synchronized (ce) {
                        StandbyCheckpointer.this.canceler = null;
                    }
                }
            }
        }
    }
}

