/*
 * Decompiled with CFR 0.152.
 */
package org.apache.fluo.core.worker;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.core.impl.Environment;
import org.apache.fluo.core.impl.Notification;
import org.apache.fluo.core.util.FluoExecutors;
import org.apache.fluo.core.util.Hex;
import org.apache.fluo.core.worker.NotificationFinder;
import org.apache.fluo.core.worker.Observers;
import org.apache.fluo.core.worker.WorkTaskAsync;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NotificationProcessor
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(NotificationProcessor.class);
    private NotificationTracker tracker;
    private ThreadPoolExecutor executor;
    private Environment env;
    private Observers observers;
    private PriorityBlockingQueue<Runnable> queue;

    public NotificationProcessor(Environment env) {
        int numThreads = env.getConfiguration().getWorkerThreads();
        this.env = env;
        this.queue = new PriorityBlockingQueue();
        this.executor = FluoExecutors.newFixedThreadPool(numThreads, this.queue, "ntfyProc");
        this.tracker = new NotificationTracker();
        this.observers = new Observers(env);
        env.getSharedResources().getMetricRegistry().register(env.getMetricNames().getNotificationQueued(), (Metric)new Gauge<Integer>(){

            public Integer getValue() {
                return NotificationProcessor.this.queue.size();
            }
        });
    }

    public boolean addNotification(NotificationFinder notificationFinder, Notification notification) {
        WorkTaskAsync workTask = new WorkTaskAsync(this, notificationFinder, this.env, notification, this.observers);
        FutureNotificationTask ft = new FutureNotificationTask(notification, notificationFinder, workTask);
        if (!this.tracker.add(notification.getRowColumn(), ft)) {
            return false;
        }
        try {
            this.executor.execute(ft);
        }
        catch (RejectedExecutionException rje) {
            this.tracker.remove(notification.getRowColumn());
            throw rje;
        }
        return true;
    }

    public void requeueNotification(NotificationFinder notificationFinder, Notification notification) {
        WorkTaskAsync workTask = new WorkTaskAsync(this, notificationFinder, this.env, notification, this.observers);
        FutureNotificationTask ft = new FutureNotificationTask(notification, notificationFinder, workTask);
        if (this.tracker.requeue(notification.getRowColumn(), ft)) {
            try {
                this.executor.execute(ft);
            }
            catch (RejectedExecutionException rje) {
                this.tracker.remove(notification.getRowColumn());
                throw rje;
            }
        }
    }

    public void notificationProcessed(Notification notification) {
        this.tracker.remove(notification.getRowColumn());
    }

    public int size() {
        return this.queue.size();
    }

    public void clear() {
        this.tracker.clear();
        this.executor.purge();
    }

    @Override
    public void close() {
        this.executor.shutdownNow();
        this.observers.close();
        try {
            while (!this.executor.awaitTermination(1L, TimeUnit.SECONDS)) {
            }
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private static class FutureNotificationTask
    extends FutureTask<Void>
    implements Comparable<FutureNotificationTask> {
        private final Notification notification;

        public FutureNotificationTask(Notification n, NotificationFinder nf, WorkTaskAsync wt) {
            super(new NotificationProcessingTask(n, nf, wt), null);
            this.notification = n;
        }

        @Override
        public int compareTo(FutureNotificationTask o) {
            return Long.compare(this.notification.getTimestamp(), o.notification.getTimestamp());
        }

        @Override
        protected void setException(Throwable t) {
            super.setException(t);
            System.err.println("Uncaught Exception ");
            t.printStackTrace();
        }
    }

    private static class NotificationProcessingTask
    implements Runnable {
        Notification notification;
        NotificationFinder notificationFinder;
        WorkTaskAsync workTask;

        NotificationProcessingTask(Notification n, NotificationFinder nf, WorkTaskAsync wt) {
            this.notification = n;
            this.notificationFinder = nf;
            this.workTask = wt;
        }

        @Override
        public void run() {
            try {
                if (this.notificationFinder.shouldProcess(this.notification)) {
                    this.workTask.run();
                }
            }
            catch (Exception e) {
                log.error("Failed to process work " + Hex.encNonAscii(this.notification), (Throwable)e);
            }
        }
    }

    private class NotificationTracker {
        private Map<RowColumn, Future<?>> queuedWork = new HashMap();
        private long sizeInBytes = 0L;
        private static final long MAX_SIZE = 0x1000000L;

        private NotificationTracker() {
        }

        private long size(RowColumn rowCol) {
            Column col = rowCol.getColumn();
            return rowCol.getRow().length() + col.getFamily().length() + col.getQualifier().length() + col.getVisibility().length();
        }

        public synchronized boolean add(RowColumn rowCol, Future<?> task) {
            if (this.queuedWork.containsKey(rowCol)) {
                return false;
            }
            while (this.sizeInBytes > 0x1000000L) {
                try {
                    this.wait(1000L);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            if (this.queuedWork.containsKey(rowCol)) {
                return false;
            }
            this.queuedWork.put(rowCol, task);
            this.sizeInBytes += this.size(rowCol);
            return true;
        }

        public synchronized void remove(RowColumn rowCol) {
            if (this.queuedWork.remove(rowCol) != null) {
                this.sizeInBytes -= this.size(rowCol);
                this.notify();
            }
        }

        public synchronized void clear() {
            for (Future<?> task : this.queuedWork.values()) {
                task.cancel(false);
            }
            this.queuedWork.clear();
            this.sizeInBytes = 0L;
            this.notify();
        }

        public boolean requeue(RowColumn rowCol, FutureTask<?> ft) {
            if (!this.queuedWork.containsKey(rowCol)) {
                return false;
            }
            this.queuedWork.put(rowCol, ft);
            return true;
        }
    }
}

