/*
 * Decompiled with CFR 0.152.
 */
package org.apache.fluo.core.worker.finder.hash;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.fluo.accumulo.iterators.NotificationHashFilter;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.FluoConfigurationImpl;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.util.UtilWaitThread;
import org.apache.fluo.core.worker.TabletInfoCache;
import org.apache.fluo.core.worker.finder.hash.HashNotificationFinder;
import org.apache.fluo.core.worker.finder.hash.ModulusParams;
import org.apache.fluo.core.worker.finder.hash.TabletData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ScanTask
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(ScanTask.class);
    private final HashNotificationFinder hwf;
    private final Random rand = new Random();
    private final AtomicBoolean stopped;
    private final TabletInfoCache<TabletData, Supplier<TabletData>> tabletInfoCache;
    private final Environment env;
    static long STABILIZE_TIME = 10000L;
    private long minSleepTime;
    private long maxSleepTime;

    ScanTask(HashNotificationFinder hashWorkFinder, Environment env, AtomicBoolean stopped) {
        this.hwf = hashWorkFinder;
        this.tabletInfoCache = new TabletInfoCache(env, new Supplier<TabletData>(){

            @Override
            public TabletData get() {
                return new TabletData();
            }
        });
        this.env = env;
        this.stopped = stopped;
        this.minSleepTime = env.getConfiguration().getInt(FluoConfigurationImpl.MIN_SLEEP_TIME_PROP, 5000);
        this.maxSleepTime = env.getConfiguration().getInt(FluoConfigurationImpl.MAX_SLEEP_TIME_PROP, 300000);
    }

    @Override
    public void run() {
        int qSize = this.hwf.getWorkerQueue().size();
        while (!this.stopped.get()) {
            try {
                while (this.hwf.getWorkerQueue().size() > qSize / 2 && !this.stopped.get()) {
                    UtilWaitThread.sleep(50L, this.stopped);
                }
                ArrayList<TabletInfoCache.TabletInfo<TabletData>> tablets = new ArrayList<TabletInfoCache.TabletInfo<TabletData>>(this.tabletInfoCache.getTablets());
                Collections.shuffle(tablets, this.rand);
                long minRetryTime = this.maxSleepTime + System.currentTimeMillis();
                int notifications = 0;
                int tabletsScanned = 0;
                try {
                    for (TabletInfoCache.TabletInfo tabletInfo : tablets) {
                        if (System.currentTimeMillis() >= ((TabletData)tabletInfo.getData()).retryTime) {
                            int count = 0;
                            ModulusParams modParams = this.hwf.getModulusParams();
                            if (modParams != null) {
                                this.env.getSharedResources().getBatchWriter().waitForAsyncFlush();
                                count = this.scan(modParams, tabletInfo.getRange());
                                ++tabletsScanned;
                            }
                            ((TabletData)tabletInfo.getData()).updateScanCount(count, this.maxSleepTime);
                            notifications += count;
                            if (this.stopped.get()) break;
                        }
                        minRetryTime = Math.min(((TabletData)tabletInfo.getData()).retryTime, minRetryTime);
                    }
                }
                catch (HashNotificationFinder.ModParamsChangedException mpce) {
                    this.hwf.getWorkerQueue().clear();
                    this.waitForFindersToStabilize();
                }
                long sleepTime = Math.max(this.minSleepTime, minRetryTime - System.currentTimeMillis());
                qSize = this.hwf.getWorkerQueue().size();
                log.debug("Scanned {} of {} tablets, added {} new notifications (total queued {})", new Object[]{tabletsScanned, tablets.size(), notifications, qSize});
                if (this.stopped.get()) continue;
                UtilWaitThread.sleep(sleepTime, this.stopped);
            }
            catch (Exception e) {
                if (this.isInterruptedException(e)) {
                    log.debug("Error while looking for notifications", (Throwable)e);
                    continue;
                }
                log.error("Error while looking for notifications", (Throwable)e);
            }
        }
    }

    private boolean isInterruptedException(Exception e) {
        boolean wasInt = false;
        for (Throwable cause = e; cause != null; cause = cause.getCause()) {
            if (!(cause instanceof InterruptedException)) continue;
            wasInt = true;
        }
        return wasInt;
    }

    private int scan(ModulusParams lmp, Range range) throws TableNotFoundException {
        Scanner scanner = this.env.getConnector().createScanner(this.env.getTable(), this.env.getAuthorizations());
        scanner.setRange(range);
        Notification.configureScanner(scanner);
        IteratorSetting iterCfg = new IteratorSetting(30, "nhf", NotificationHashFilter.class);
        NotificationHashFilter.setModulusParams((IteratorSetting)iterCfg, (int)lmp.divisor, (int)lmp.remainder);
        scanner.addScanIterator(iterCfg);
        int count = 0;
        for (Map.Entry entry : scanner) {
            if (lmp.update != this.hwf.getModulusParams().update) {
                throw new HashNotificationFinder.ModParamsChangedException();
            }
            if (this.stopped.get()) {
                return count;
            }
            if (!this.hwf.getWorkerQueue().addNotification(this.hwf, Notification.from((Key)entry.getKey()))) continue;
            ++count;
        }
        return count;
    }

    private void waitForFindersToStabilize() {
        ModulusParams lmp = this.hwf.getModulusParams();
        long startTime = System.currentTimeMillis();
        while (System.currentTimeMillis() - startTime < STABILIZE_TIME) {
            UtilWaitThread.sleep(500L, this.stopped);
            ModulusParams lmp2 = this.hwf.getModulusParams();
            if (lmp.update == lmp2.update) continue;
            startTime = System.currentTimeMillis();
            lmp = lmp2;
        }
    }
}

