/*
 * Decompiled with CFR 0.152.
 */
package org.apache.fluo.core.oracle;

import com.codahale.metrics.Histogram;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundPathAndBytesable;
import org.apache.curator.framework.api.WatchPathable;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.recipes.leader.Participant;
import org.apache.fluo.accumulo.util.LongUtil;
import org.apache.fluo.core.impl.CuratorCnxnListener;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.FluoConfigurationImpl;
import org.apache.fluo.core.metrics.MetricsUtil;
import org.apache.fluo.core.thrift.OracleService;
import org.apache.fluo.core.thrift.Stamps;
import org.apache.fluo.core.util.CuratorUtil;
import org.apache.fluo.core.util.Halt;
import org.apache.fluo.core.util.HostUtil;
import org.apache.fluo.core.util.PortUtils;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OracleServer
extends LeaderSelectorListenerAdapter
implements OracleService.Iface,
PathChildrenCacheListener {
    private static final Logger log = LoggerFactory.getLogger(OracleServer.class);
    private final Histogram stampsHistogram;
    public static final long ORACLE_MAX_READ_BUFFER_BYTES = 2048L;
    private final Environment env;
    private Thread serverThread;
    private THsHaServer server;
    private volatile long currentTs = 0L;
    private volatile long maxTs = 0L;
    private volatile boolean started = false;
    private int port = 0;
    private LeaderSelector leaderSelector;
    private PathChildrenCache pathChildrenCache;
    private CuratorFramework curatorFramework;
    private CuratorCnxnListener cnxnListener;
    private Participant currentLeader;
    private final String maxTsPath;
    private final String oraclePath;
    private volatile boolean isLeader = false;
    private GcTimestampTracker gcTsTracker;

    public OracleServer(Environment env) throws Exception {
        this.env = env;
        this.stampsHistogram = MetricsUtil.getHistogram(env.getConfiguration(), env.getSharedResources().getMetricRegistry(), env.getMetricNames().getOracleServerStamps());
        this.cnxnListener = new CuratorCnxnListener();
        this.maxTsPath = "/oracle/max-timestamp";
        this.oraclePath = "/oracle/server";
    }

    private void allocateTimestamp() throws Exception {
        Stat stat = new Stat();
        byte[] d = (byte[])((WatchPathable)this.curatorFramework.getData().storingStatIn(stat)).forPath(this.maxTsPath);
        long newMax = Long.parseLong(new String(d)) + 1000L;
        ((BackgroundPathAndBytesable)this.curatorFramework.setData().withVersion(stat.getVersion())).forPath(this.maxTsPath, LongUtil.toByteArray((Long)newMax));
        this.maxTs = newMax;
        if (!this.isLeader) {
            throw new IllegalStateException();
        }
    }

    @Override
    public Stamps getTimestamps(String id, int num) throws TException {
        long start = this.getTimestampsImpl(id, num);
        this.stampsHistogram.update(num);
        return new Stamps(start, this.gcTsTracker.advertisedGcTimetamp);
    }

    private synchronized long getTimestampsImpl(String id, int num) throws TException {
        if (!this.started) {
            throw new IllegalStateException("Received timestamp request but Oracle has not started");
        }
        if (!id.equals(this.env.getFluoApplicationID())) {
            throw new IllegalArgumentException("Received timestamp request with a Fluo application ID [" + id + "] that does not match the application ID [" + this.env.getFluoApplicationID() + "] of the Oracle");
        }
        if (!this.isLeader) {
            throw new IllegalStateException("Received timestamp request but Oracle is not leader");
        }
        try {
            while ((long)num + this.currentTs >= this.maxTs) {
                this.allocateTimestamp();
            }
            long tmp = this.currentTs;
            this.currentTs += (long)num;
            return tmp;
        }
        catch (Exception e) {
            throw new TException((Throwable)e);
        }
    }

    @Override
    public boolean isLeader() throws TException {
        return this.isLeader;
    }

    private boolean isLeader(Participant participant) {
        return participant != null && participant.isLeader();
    }

    @VisibleForTesting
    public int getPort() {
        return this.port;
    }

    @VisibleForTesting
    public boolean isConnected() {
        return this.started && this.cnxnListener.isConnected();
    }

    private InetSocketAddress startServer() throws TTransportException {
        if (this.env.getConfiguration().containsKey("fluo.impl.oracle.port")) {
            this.port = this.env.getConfiguration().getInt("fluo.impl.oracle.port");
            Preconditions.checkArgument((this.port >= 1 && this.port <= 65535 ? 1 : 0) != 0, (Object)"fluo.impl.oracle.port must be valid port (1-65535)");
        } else {
            this.port = PortUtils.getRandomFreePort();
        }
        InetSocketAddress addr = new InetSocketAddress(this.port);
        TNonblockingServerSocket socket = new TNonblockingServerSocket(addr);
        THsHaServer.Args serverArgs = new THsHaServer.Args((TNonblockingServerTransport)socket);
        OracleService.Processor<OracleServer> processor = new OracleService.Processor<OracleServer>(this);
        serverArgs.processor(processor);
        serverArgs.maxReadBufferBytes = 2048L;
        serverArgs.inputProtocolFactory((TProtocolFactory)new TCompactProtocol.Factory());
        serverArgs.outputProtocolFactory((TProtocolFactory)new TCompactProtocol.Factory());
        this.server = new THsHaServer(serverArgs);
        Runnable st = new Runnable(){

            @Override
            public void run() {
                OracleServer.this.server.serve();
            }
        };
        this.serverThread = new Thread(st);
        this.serverThread.setDaemon(true);
        this.serverThread.start();
        return addr;
    }

    public synchronized void start() throws Exception {
        if (this.started) {
            throw new IllegalStateException();
        }
        InetSocketAddress addr = this.startServer();
        this.curatorFramework = CuratorUtil.newAppCurator(this.env.getConfiguration());
        this.curatorFramework.getConnectionStateListenable().addListener((Object)this.cnxnListener);
        this.curatorFramework.start();
        while (!this.cnxnListener.isConnected()) {
            Thread.sleep(200L);
        }
        this.leaderSelector = new LeaderSelector(this.curatorFramework, "/oracle/server", (LeaderSelectorListener)this);
        String leaderId = HostUtil.getHostName() + ":" + addr.getPort();
        this.leaderSelector.setId(leaderId);
        log.info("Leader ID = " + leaderId);
        this.leaderSelector.start();
        this.pathChildrenCache = new PathChildrenCache(this.curatorFramework, this.oraclePath, true);
        this.pathChildrenCache.getListenable().addListener((Object)this);
        this.pathChildrenCache.start();
        while (!this.cnxnListener.isConnected()) {
            Thread.sleep(200L);
        }
        log.info("Listening " + addr);
        this.started = true;
    }

    public synchronized void stop() throws Exception {
        if (this.started) {
            this.server.stop();
            this.serverThread.join();
            if (this.gcTsTracker != null) {
                this.gcTsTracker.stop();
            }
            this.started = false;
            this.currentLeader = null;
            if (this.curatorFramework.getState().equals((Object)CuratorFrameworkState.STARTED)) {
                this.pathChildrenCache.getListenable().removeListener((Object)this);
                this.pathChildrenCache.close();
                this.leaderSelector.close();
                this.curatorFramework.getConnectionStateListenable().removeListener((Object)this);
                this.curatorFramework.close();
            }
            log.info("Oracle server has been stopped.");
        }
    }

    private OracleService.Client getOracleClient(String host, int port) {
        try {
            TFastFramedTransport transport = new TFastFramedTransport((TTransport)new TSocket(host, port));
            transport.open();
            TCompactProtocol protocol = new TCompactProtocol((TTransport)transport);
            log.info("Former leader was reachable at " + host + ":" + port);
            return new OracleService.Client((TProtocol)protocol);
        }
        catch (TTransportException e) {
            log.debug("Exception thrown in getOracleClient()", (Throwable)e);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
        try {
            int port;
            String[] address;
            String host;
            OracleService.Client client;
            if (this.currentLeader != null && (client = this.getOracleClient(host = (address = this.currentLeader.getId().split(":"))[0], port = Integer.parseInt(address[1]))) != null) {
                try {
                    while (client.isLeader()) {
                        Thread.sleep(500L);
                    }
                }
                catch (Exception e) {
                    log.debug("Exception thrown in takeLeadership()", (Throwable)e);
                }
            }
            OracleServer oracleServer = this;
            synchronized (oracleServer) {
                byte[] d = (byte[])curatorFramework.getData().forPath(this.maxTsPath);
                this.currentTs = this.maxTs = LongUtil.fromByteArray((byte[])d).longValue();
            }
            this.gcTsTracker = new GcTimestampTracker();
            this.gcTsTracker.start();
            this.isLeader = true;
            while (this.started) {
                Thread.sleep(100L);
            }
        }
        finally {
            this.isLeader = false;
            if (this.started) {
                Halt.halt("Oracle has lost leadership unexpectedly and is now halting.");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
        block6: {
            try {
                if (!this.isConnected() || !event.getType().equals((Object)PathChildrenCacheEvent.Type.CHILD_ADDED) && !event.getType().equals((Object)PathChildrenCacheEvent.Type.CHILD_REMOVED) && !event.getType().equals((Object)PathChildrenCacheEvent.Type.CHILD_UPDATED)) break block6;
                OracleServer oracleServer = this;
                synchronized (oracleServer) {
                    Participant participant = this.leaderSelector.getLeader();
                    if (this.isLeader(participant) && !this.leaderSelector.hasLeadership()) {
                        this.currentLeader = participant;
                    }
                }
            }
            catch (InterruptedException e) {
                log.warn("Oracle leadership watcher has been interrupted unexpectedly");
            }
        }
    }

    private class GcTimestampTracker {
        private volatile long advertisedGcTimetamp;
        private CuratorFramework curator;
        private Timer timer;

        GcTimestampTracker() throws Exception {
            this.curator = OracleServer.this.env.getSharedResources().getCurator();
        }

        private void updateAdvertisedGcTimestamp(long newTs) throws Exception {
            if (newTs > this.advertisedGcTimetamp && OracleServer.this.isLeader) {
                this.advertisedGcTimetamp = newTs;
                this.curator.setData().forPath("/oracle/gc-timestamp", LongUtil.toByteArray((Long)this.advertisedGcTimetamp));
            }
        }

        private void updateGcTimestamp() throws Exception {
            List children;
            try {
                children = (List)this.curator.getChildren().forPath("/transactor/timestamps");
            }
            catch (KeeperException.NoNodeException nne) {
                children = Collections.emptyList();
            }
            long oldestTs = Long.MAX_VALUE;
            boolean nodeFound = false;
            for (String child : children) {
                Long ts = LongUtil.fromByteArray((byte[])((byte[])this.curator.getData().forPath("/transactor/timestamps/" + child)));
                nodeFound = true;
                if (ts >= oldestTs) continue;
                oldestTs = ts;
            }
            if (nodeFound) {
                this.updateAdvertisedGcTimestamp(oldestTs);
            } else {
                this.updateAdvertisedGcTimestamp(OracleServer.this.currentTs);
            }
        }

        void start() throws Exception {
            this.advertisedGcTimetamp = LongUtil.fromByteArray((byte[])((byte[])this.curator.getData().forPath("/oracle/gc-timestamp")));
            TimerTask tt = new TimerTask(){

                @Override
                public void run() {
                    try {
                        GcTimestampTracker.this.updateGcTimestamp();
                    }
                    catch (Exception e) {
                        log.warn("Failed to update GC timestamp.", (Throwable)e);
                    }
                }
            };
            TimerTask logTask = new TimerTask(){

                @Override
                public void run() {
                    log.info("Current timestamp: {}", (Object)OracleServer.this.currentTs);
                }
            };
            this.timer = new Timer("Oracle gc update timer", true);
            long updatePeriod = OracleServer.this.env.getConfiguration().getLong("fluo.impl.timestamp.update.period", FluoConfigurationImpl.ZK_UPDATE_PERIOD_MS_DEFAULT);
            long nextPeriod = 300000L;
            this.timer.schedule(tt, updatePeriod, updatePeriod);
            this.timer.schedule(logTask, 0L, nextPeriod);
        }

        void stop() {
            if (this.timer != null) {
                this.timer.cancel();
                this.timer = null;
            }
        }
    }
}

