/*
 * Decompiled with CFR 0.152.
 */
package org.apache.fluo.mini;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.File;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.accumulo.minicluster.MiniAccumuloConfig;
import org.apache.fluo.api.client.FluoAdmin;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.mini.MiniFluo;
import org.apache.fluo.core.client.FluoClientImpl;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.FluoConfigurationImpl;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.oracle.OracleServer;
import org.apache.fluo.core.worker.NotificationFinder;
import org.apache.fluo.core.worker.NotificationFinderFactory;
import org.apache.fluo.core.worker.NotificationProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MiniFluoImpl
implements MiniFluo {
    private static final Logger log = LoggerFactory.getLogger(MiniFluoImpl.class);
    private static final AtomicInteger reporterCounter = new AtomicInteger(1);
    private final Environment env;
    private OracleServer oserver;
    private NotificationProcessor mnp;
    private NotificationFinder notificationFinder;
    private FluoConfiguration config;
    private MiniAccumuloCluster cluster = null;
    protected static String USER = "root";
    protected static String PASSWORD = "secret";
    private AutoCloseable reporter;

    public static String clientPropsPath(FluoConfiguration config) {
        return config.getMiniDataDir() + "/client.properties";
    }

    @VisibleForTesting
    public NotificationProcessor getNotificationProcessor() {
        return this.mnp;
    }

    public MiniFluoImpl(FluoConfiguration fluoConfig) {
        if (!fluoConfig.hasRequiredMiniFluoProps()) {
            throw new IllegalArgumentException("MiniFluo configuration is not valid");
        }
        this.config = fluoConfig;
        try {
            if (this.config.getMiniStartAccumulo()) {
                this.startMiniAccumulo();
            }
            this.config.setProperty(FluoConfigurationImpl.MIN_SLEEP_TIME_PROP, Integer.valueOf(50));
            this.config.setProperty(FluoConfigurationImpl.MAX_SLEEP_TIME_PROP, Integer.valueOf(100));
            this.env = new Environment(this.config);
            this.reporter = FluoClientImpl.setupReporters((Environment)this.env, (String)"mini", (AtomicInteger)reporterCounter);
            this.oserver = new OracleServer(this.env);
            this.oserver.start();
            this.mnp = new NotificationProcessor(this.env);
            this.notificationFinder = NotificationFinderFactory.newNotificationFinder((FluoConfiguration)this.env.getConfiguration());
            this.notificationFinder.init(this.env, this.mnp);
            this.notificationFinder.start();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void startMiniAccumulo() {
        try {
            MiniAccumuloConfig cfg = new MiniAccumuloConfig(new File(this.config.getMiniDataDir()), PASSWORD);
            this.cluster = new MiniAccumuloCluster(cfg);
            this.cluster.start();
            log.debug("Started MiniAccumulo(accumulo=" + this.cluster.getInstanceName() + " zk=" + this.cluster.getZooKeepers() + ")");
            this.config.setAccumuloInstance(this.cluster.getInstanceName());
            this.config.setAccumuloUser(USER);
            this.config.setAccumuloPassword(PASSWORD);
            this.config.setAccumuloZookeepers(this.cluster.getZooKeepers());
            this.config.setInstanceZookeepers(this.cluster.getZooKeepers() + "/fluo");
            if (!this.config.containsKey("fluo.admin.accumulo.table") || this.config.getAccumuloTable().trim().isEmpty()) {
                this.config.setAccumuloTable("fluo");
            }
            FluoAdmin.InitializationOptions opts = new FluoAdmin.InitializationOptions();
            try (FluoAdmin admin = FluoFactory.newAdmin((SimpleConfiguration)this.config);){
                admin.initialize(opts);
            }
            File miniProps = new File(MiniFluoImpl.clientPropsPath(this.config));
            this.config.getClientConfiguration().save(miniProps);
            log.debug("Wrote MiniFluo client properties to {}", (Object)miniProps.getAbsolutePath());
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public SimpleConfiguration getClientConfiguration() {
        return this.config.getClientConfiguration();
    }

    public void close() {
        try {
            if (this.oserver != null) {
                this.notificationFinder.stop();
                this.mnp.close();
                this.oserver.stop();
                this.env.close();
                this.reporter.close();
                if (this.cluster != null) {
                    this.cluster.stop();
                }
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void waitForObservers() {
        try {
            Scanner scanner = this.env.getConnector().createScanner(this.env.getTable(), this.env.getAuthorizations());
            Notification.configureScanner((Scanner)scanner);
            while (true) {
                long ts1 = this.env.getSharedResources().getOracleClient().getStamp().getTxTimestamp();
                long ntfyCount = Iterables.size((Iterable)scanner);
                long ts2 = this.env.getSharedResources().getOracleClient().getStamp().getTxTimestamp();
                if (ntfyCount != 0L || ts1 != ts2 - 1L) {
                    long sleepTime = ntfyCount / 2L;
                    sleepTime = Math.min(Math.max(10L, sleepTime), 10000L);
                    Uninterruptibles.sleepUninterruptibly((long)sleepTime, (TimeUnit)TimeUnit.MILLISECONDS);
                    continue;
                }
                break;
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

