/*
 * Decompiled with CFR 0.152.
 */
package org.apache.fluo.core.oracle;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.recipes.leader.Participant;
import org.apache.fluo.api.exceptions.FluoException;
import org.apache.fluo.core.impl.CuratorCnxnListener;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.metrics.MetricsUtil;
import org.apache.fluo.core.oracle.Stamp;
import org.apache.fluo.core.thrift.OracleService;
import org.apache.fluo.core.thrift.Stamps;
import org.apache.fluo.core.util.CuratorUtil;
import org.apache.fluo.core.util.UtilWaitThread;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OracleClient
implements AutoCloseable {
    public static final Logger log = LoggerFactory.getLogger(OracleClient.class);
    private static final int MAX_ORACLE_WAIT_PERIOD = 60;
    private final Timer responseTimer;
    private final Histogram stampsHistogram;
    private Participant currentLeader;
    private final Environment env;
    private final ArrayBlockingQueue<TimeRequest> queue = new ArrayBlockingQueue(10000);
    private final Thread thread;
    private AtomicBoolean closed = new AtomicBoolean(false);
    private final TimestampRetriever timestampRetriever;

    public OracleClient(Environment env) {
        this.env = env;
        this.responseTimer = MetricsUtil.getTimer(env.getConfiguration(), env.getSharedResources().getMetricRegistry(), env.getMetricNames().getOracleResponseTime());
        this.stampsHistogram = MetricsUtil.getHistogram(env.getConfiguration(), env.getSharedResources().getMetricRegistry(), env.getMetricNames().getOracleClientStamps());
        this.timestampRetriever = new TimestampRetriever();
        this.thread = new Thread(this.timestampRetriever);
        this.thread.setDaemon(true);
        this.thread.start();
    }

    public Stamp getStamp() {
        this.checkClosed();
        TimeRequest tr = new TimeRequest();
        try {
            this.queue.put(tr);
            int timeout = this.env.getConfiguration().getClientRetryTimeout();
            if (timeout < 0) {
                long waitPeriod = 1L;
                long waitTotal = 0L;
                while (!tr.cdl.await(waitPeriod, TimeUnit.SECONDS)) {
                    this.checkClosed();
                    waitTotal += waitPeriod;
                    if (waitPeriod < 60L) {
                        waitPeriod *= 2L;
                    }
                    log.warn("Waiting for timestamp from Oracle. Is it running? waitTotal={}s waitPeriod={}s", (Object)waitTotal, (Object)waitPeriod);
                }
            } else if (!tr.cdl.await(timeout, TimeUnit.MILLISECONDS)) {
                throw new FluoException("Timed out (after " + timeout + "ms) trying to retrieve timestamp from Oracle.  Is the Oracle running?");
            }
        }
        catch (InterruptedException e) {
            throw new FluoException("Interrupted while retrieving timestamp from Oracle", (Throwable)e);
        }
        return tr.stampRef.get();
    }

    public ListenableFuture<Stamp> getStampAsync() {
        ListenableFutureTask lf;
        this.checkClosed();
        TimeRequest tr = new TimeRequest();
        tr.lf = lf = ListenableFutureTask.create((Callable)tr);
        try {
            this.queue.put(tr);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return lf;
    }

    public synchronized String getOracle() {
        this.checkClosed();
        return this.currentLeader != null ? this.currentLeader.getId() : null;
    }

    private void checkClosed() {
        if (this.closed.get()) {
            throw new IllegalStateException(OracleClient.class.getSimpleName() + " is closed");
        }
    }

    @Override
    public void close() {
        if (!this.closed.get()) {
            this.closed.set(true);
            try {
                this.thread.interrupt();
                this.thread.join();
                this.timestampRetriever.close();
            }
            catch (InterruptedException e) {
                throw new FluoException("Interrupted during close", (Throwable)e);
            }
        }
    }

    private class TimestampRetriever
    extends LeaderSelectorListenerAdapter
    implements Runnable,
    PathChildrenCacheListener {
        private LeaderSelector leaderSelector;
        private CuratorFramework curatorFramework;
        private OracleService.Client client;
        private PathChildrenCache pathChildrenCache;
        private TTransport transport;

        private TimestampRetriever() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                TimestampRetriever timestampRetriever = this;
                synchronized (timestampRetriever) {
                    if (OracleClient.this.closed.get()) {
                        return;
                    }
                    this.curatorFramework = CuratorUtil.newAppCurator(OracleClient.this.env.getConfiguration());
                    CuratorCnxnListener cnxnListener = new CuratorCnxnListener();
                    this.curatorFramework.getConnectionStateListenable().addListener((Object)cnxnListener);
                    this.curatorFramework.start();
                    while (!cnxnListener.isConnected()) {
                        Thread.sleep(200L);
                    }
                    this.pathChildrenCache = new PathChildrenCache(this.curatorFramework, "/oracle/server", true);
                    this.pathChildrenCache.getListenable().addListener((Object)this);
                    this.pathChildrenCache.start();
                    this.leaderSelector = new LeaderSelector(this.curatorFramework, "/oracle/server", (LeaderSelectorListener)this);
                    this.connect();
                }
                this.doWork();
            }
            catch (Exception e) {
                if (!OracleClient.this.closed.get()) {
                    log.error("Exception occurred in run() method", (Throwable)e);
                }
                log.debug("Exception occurred in run() method", (Throwable)e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
            if (event.getType().equals((Object)PathChildrenCacheEvent.Type.CHILD_REMOVED) || event.getType().equals((Object)PathChildrenCacheEvent.Type.CHILD_ADDED) || event.getType().equals((Object)PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
                Participant participant = this.leaderSelector.getLeader();
                TimestampRetriever timestampRetriever = this;
                synchronized (timestampRetriever) {
                    if (this.isLeader(participant)) {
                        OracleClient.this.currentLeader = this.leaderSelector.getLeader();
                    } else {
                        OracleClient.this.currentLeader = null;
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void doWork() {
            ArrayList<TimeRequest> request = new ArrayList<TimeRequest>();
            while (true) {
                try {
                    block10: while (true) {
                        long gcStamp;
                        long txStampsStart;
                        request.clear();
                        TimeRequest trh = null;
                        while (trh == null) {
                            if (OracleClient.this.closed.get()) {
                                return;
                            }
                            trh = (TimeRequest)OracleClient.this.queue.poll(1L, TimeUnit.SECONDS);
                        }
                        request.add(trh);
                        OracleClient.this.queue.drainTo(request);
                        while (true) {
                            try {
                                Timer.Context timerContext;
                                while (true) {
                                    OracleService.Client localClient;
                                    String currentLeaderId;
                                    TimestampRetriever timestampRetriever = this;
                                    synchronized (timestampRetriever) {
                                        currentLeaderId = OracleClient.this.getOracle();
                                        localClient = this.client;
                                    }
                                    timerContext = OracleClient.this.responseTimer.time();
                                    Stamps stamps = localClient.getTimestamps(OracleClient.this.env.getFluoApplicationID(), request.size());
                                    txStampsStart = stamps.txStampsStart;
                                    gcStamp = stamps.gcStamp;
                                    String leaderId = OracleClient.this.getOracle();
                                    if (leaderId == null || leaderId.equals(currentLeaderId)) break;
                                    this.reconnect();
                                }
                                OracleClient.this.stampsHistogram.update(request.size());
                                timerContext.close();
                            }
                            catch (TTransportException tte) {
                                log.info("Oracle connection lost. Retrying...");
                                this.reconnect();
                                continue;
                            }
                            catch (TException e) {
                                log.error("TException occurred in doWork() method", (Throwable)e);
                                continue;
                            }
                            break;
                        }
                        int i = 0;
                        while (true) {
                            if (i >= request.size()) continue block10;
                            TimeRequest tr = (TimeRequest)request.get(i);
                            tr.stampRef.set(new Stamp(txStampsStart + (long)i, gcStamp));
                            if (tr.lf == null) {
                                tr.cdl.countDown();
                            } else {
                                tr.lf.run();
                            }
                            ++i;
                        }
                        break;
                    }
                }
                catch (InterruptedException e) {
                    if (!OracleClient.this.closed.get()) {
                        log.error("InterruptedException occurred in doWork() method", (Throwable)e);
                        continue;
                    }
                    log.debug("InterruptedException occurred in doWork() method", (Throwable)e);
                    continue;
                }
                catch (Exception e) {
                    log.error("Exception occurred in doWork() method", (Throwable)e);
                    continue;
                }
                break;
            }
        }

        private synchronized void connect() throws IOException, KeeperException, InterruptedException, TTransportException {
            this.getLeader();
            while (true) {
                log.debug("Connecting to oracle at " + OracleClient.this.currentLeader.getId());
                String[] hostAndPort = OracleClient.this.currentLeader.getId().split(":");
                String host = hostAndPort[0];
                int port = Integer.parseInt(hostAndPort[1]);
                try {
                    this.transport = new TFastFramedTransport((TTransport)new TSocket(host, port));
                    this.transport.open();
                    TCompactProtocol protocol = new TCompactProtocol(this.transport);
                    this.client = new OracleService.Client((TProtocol)protocol);
                    log.info("Connected to oracle at " + OracleClient.this.getOracle());
                }
                catch (TTransportException e) {
                    this.sleepRandom();
                    this.getLeader();
                    continue;
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
                break;
            }
        }

        private synchronized void reconnect() throws InterruptedException, TTransportException, KeeperException, IOException {
            if (this.transport.isOpen()) {
                this.transport.close();
            }
            this.connect();
        }

        private synchronized void close() {
            if (this.transport != null && this.transport.isOpen()) {
                this.transport.close();
            }
            try {
                if (this.pathChildrenCache != null) {
                    this.pathChildrenCache.close();
                }
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            if (this.curatorFramework != null) {
                this.curatorFramework.close();
            }
            this.transport = null;
            this.pathChildrenCache = null;
            this.leaderSelector = null;
            this.curatorFramework = null;
        }

        private boolean getLeaderAttempt() {
            Participant possibleLeader = null;
            try {
                possibleLeader = this.leaderSelector.getLeader();
            }
            catch (KeeperException e) {
                log.debug("Exception throw in getLeaderAttempt()", (Throwable)e);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            if (this.isLeader(possibleLeader)) {
                OracleClient.this.currentLeader = possibleLeader;
                return true;
            }
            return false;
        }

        private void getLeader() {
            boolean found = this.getLeaderAttempt();
            while (!found) {
                this.sleepRandom();
                found = this.getLeaderAttempt();
            }
        }

        private void sleepRandom() {
            UtilWaitThread.sleep(100L + (long)(1000.0 * Math.random()), OracleClient.this.closed);
        }

        private boolean isLeader(Participant participant) {
            return participant != null && participant.isLeader();
        }

        public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
        }
    }

    private static final class TimeRequest
    implements Callable<Stamp> {
        CountDownLatch cdl = new CountDownLatch(1);
        AtomicReference<Stamp> stampRef = new AtomicReference();
        ListenableFutureTask<Stamp> lf = null;

        private TimeRequest() {
        }

        @Override
        public Stamp call() throws Exception {
            return this.stampRef.get();
        }
    }
}

