/*
 * Decompiled with CFR 0.152.
 */
package org.apache.fluo.core.worker.finder.hash;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
import org.apache.curator.utils.ZKPaths;
import org.apache.fluo.accumulo.iterators.NotificationHashFilter;
import org.apache.fluo.accumulo.util.NotificationUtil;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.util.ByteUtil;
import org.apache.fluo.core.util.UtilWaitThread;
import org.apache.fluo.core.worker.NotificationFinder;
import org.apache.fluo.core.worker.NotificationProcessor;
import org.apache.fluo.core.worker.TxResult;
import org.apache.fluo.core.worker.finder.hash.ModulusParams;
import org.apache.fluo.core.worker.finder.hash.ScanTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HashNotificationFinder
implements NotificationFinder {
    private NotificationProcessor notificationProcessor;
    private CuratorFramework curator;
    private List<String> finders = Collections.emptyList();
    private int updates = 0;
    private ModulusParams modParams;
    private Environment env;
    private AtomicBoolean stopped = new AtomicBoolean(false);
    private Thread scanThread;
    private PathChildrenCache childrenCache;
    private PersistentEphemeralNode myESNode;
    private static final Logger log = LoggerFactory.getLogger(HashNotificationFinder.class);

    private synchronized void updateFinders() {
        String me = this.myESNode.getActualPath();
        while (me == null) {
            UtilWaitThread.sleep(100L);
            me = this.myESNode.getActualPath();
        }
        me = ZKPaths.getNodeFromPath((String)me);
        ArrayList<String> children = new ArrayList<String>();
        for (ChildData childData : this.childrenCache.getCurrentData()) {
            children.add(ZKPaths.getNodeFromPath((String)childData.getPath()));
        }
        Collections.sort(children);
        if (!this.finders.equals(children)) {
            int index = children.indexOf(me);
            if (index == -1) {
                this.modParams = null;
                this.finders = Collections.emptyList();
                log.debug("Did not find self in list of finders " + me);
            } else {
                ++this.updates;
                this.modParams = new ModulusParams(children.indexOf(me), children.size(), this.updates);
                this.finders = children;
                log.debug("updated modulus params " + this.modParams.remainder + " " + this.modParams.divisor);
            }
        }
    }

    synchronized ModulusParams getModulusParams() {
        return this.modParams;
    }

    @Override
    public void init(Environment env, NotificationProcessor notificationProcessor) {
        Preconditions.checkState((this.notificationProcessor == null ? 1 : 0) != 0);
        this.notificationProcessor = notificationProcessor;
        this.env = env;
        this.curator = env.getSharedResources().getCurator();
        try {
            this.myESNode = new PersistentEphemeralNode(this.curator, PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, "/finders/f-", new byte[0]);
            this.myESNode.start();
            this.myESNode.waitForInitialCreate(1L, TimeUnit.MINUTES);
            this.childrenCache = new PathChildrenCache(env.getSharedResources().getCurator(), "/finders", false);
            this.childrenCache.getListenable().addListener((Object)new FindersListener());
            this.childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
            this.updateFinders();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void start() {
        this.scanThread = new Thread(new ScanTask(this, this.env, this.stopped));
        this.scanThread.setName(this.getClass().getSimpleName() + " " + ScanTask.class.getSimpleName());
        this.scanThread.setDaemon(true);
        this.scanThread.start();
    }

    @Override
    public void stop() {
        this.stopped.set(true);
        try {
            this.childrenCache.close();
        }
        catch (IOException e1) {
            log.warn("Failed to close children cache", (Throwable)e1);
        }
        try {
            this.myESNode.close();
        }
        catch (IOException e1) {
            log.warn("Failed to close ephemeral node", (Throwable)e1);
        }
        this.scanThread.interrupt();
        try {
            this.scanThread.join();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void failedToProcess(Notification notification, TxResult status) {
    }

    NotificationProcessor getWorkerQueue() {
        return this.notificationProcessor;
    }

    @VisibleForTesting
    static boolean shouldProcess(Notification notification, int divisor, int remainder) {
        byte[] cfcq = NotificationUtil.encodeCol((Column)notification.getColumn());
        return NotificationHashFilter.accept((ByteSequence)ByteUtil.toByteSequence(notification.getRow()), (ByteSequence)new ArrayByteSequence(cfcq), (int)divisor, (int)remainder);
    }

    @Override
    public boolean shouldProcess(Notification notification) {
        ModulusParams mp = this.getModulusParams();
        return HashNotificationFinder.shouldProcess(notification, mp.divisor, mp.remainder);
    }

    private class FindersListener
    implements PathChildrenCacheListener {
        private FindersListener() {
        }

        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
            switch (event.getType()) {
                case CHILD_ADDED: 
                case CHILD_REMOVED: {
                    if (HashNotificationFinder.this.stopped.get()) break;
                    HashNotificationFinder.this.updateFinders();
                    break;
                }
                case CHILD_UPDATED: {
                    log.warn("unexpected event " + event);
                    break;
                }
            }
        }
    }

    static class ModParamsChangedException
    extends RuntimeException {
        private static final long serialVersionUID = 1L;

        ModParamsChangedException() {
        }
    }
}

