/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nutch.fetcher;

import crawlercommons.robots.BaseRobotRules;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.avro.util.Utf8;
import org.apache.gora.mapreduce.GoraReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.nutch.fetcher.FetchEntry;
import org.apache.nutch.fetcher.FetcherJob;
import org.apache.nutch.host.HostDb;
import org.apache.nutch.net.URLFilterException;
import org.apache.nutch.net.URLFilters;
import org.apache.nutch.net.URLNormalizers;
import org.apache.nutch.parse.ParseUtil;
import org.apache.nutch.parse.ParserJob;
import org.apache.nutch.protocol.Content;
import org.apache.nutch.protocol.Protocol;
import org.apache.nutch.protocol.ProtocolFactory;
import org.apache.nutch.protocol.ProtocolOutput;
import org.apache.nutch.protocol.ProtocolStatusUtils;
import org.apache.nutch.storage.Host;
import org.apache.nutch.storage.Mark;
import org.apache.nutch.storage.ProtocolStatus;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.TableUtil;
import org.apache.nutch.util.URLUtil;
import org.slf4j.Logger;

public class FetcherReducer
extends GoraReducer<IntWritable, FetchEntry, String, WebPage> {
    public static final Logger LOG = FetcherJob.LOG;
    private final AtomicInteger activeThreads = new AtomicInteger(0);
    private final AtomicInteger spinWaiting = new AtomicInteger(0);
    private final long start = System.currentTimeMillis();
    private final AtomicLong lastRequestStart = new AtomicLong(this.start);
    private final AtomicLong bytes = new AtomicLong(0L);
    private final AtomicInteger pages = new AtomicInteger(0);
    private final AtomicInteger errors = new AtomicInteger(0);
    private QueueFeeder feeder;
    private final List<FetcherThread> fetcherThreads = new ArrayList<FetcherThread>();
    private FetchItemQueues fetchQueues;
    private boolean storingContent;
    private boolean parse;
    private ParseUtil parseUtil;
    private boolean skipTruncated;

    private void reportAndLogStatus(Reducer.Context context, float actualPages, int actualBytes, int totalSize) throws IOException {
        StringBuilder status = new StringBuilder();
        long elapsed = (System.currentTimeMillis() - this.start) / 1000L;
        status.append(this.spinWaiting).append("/").append(this.activeThreads).append(" spinwaiting/active, ");
        status.append(this.pages).append(" pages, ").append(this.errors).append(" errors, ");
        status.append((double)Math.round((float)this.pages.get() * 10.0f / (float)elapsed) / 10.0).append(" ");
        status.append(Math.round((double)(actualPages * 10.0f) / 10.0)).append(" pages/s, ");
        status.append((long)Math.round((float)this.bytes.get() * 8.0f / 1024.0f) / elapsed).append(" ");
        status.append(Math.round((float)actualBytes * 8.0f) / 1024).append(" kb/s, ");
        status.append(totalSize).append(" URLs in ");
        status.append(this.fetchQueues.getQueueCount()).append(" queues");
        String toString = status.toString();
        context.setStatus(toString);
        LOG.info(toString);
    }

    public void run(Reducer.Context context) throws IOException, InterruptedException {
        Configuration conf = context.getConfiguration();
        this.fetchQueues = new FetchItemQueues(conf);
        int threadCount = conf.getInt("fetcher.threads.fetch", 10);
        this.parse = conf.getBoolean("fetcher.parse", false);
        this.storingContent = conf.getBoolean("fetcher.store.content", true);
        if (this.parse) {
            this.skipTruncated = conf.getBoolean("parser.skip.truncated", true);
            this.parseUtil = new ParseUtil(conf);
        }
        LOG.info("Fetcher: threads: " + threadCount);
        int maxFeedPerThread = conf.getInt("fetcher.queue.depth.multiplier", 50);
        this.feeder = new QueueFeeder(context, this.fetchQueues, threadCount * maxFeedPerThread);
        this.feeder.start();
        for (int i = 0; i < threadCount; ++i) {
            FetcherThread ft = new FetcherThread(context, i);
            this.fetcherThreads.add(ft);
            ft.start();
        }
        long timeout = conf.getInt("mapred.task.timeout", 600000) / 2;
        int throughputThresholdCurrentSequence = 0;
        int throughputThresholdPages = conf.getInt("fetcher.throughput.threshold.pages", -1);
        if (LOG.isInfoEnabled()) {
            LOG.info("Fetcher: throughput threshold: " + throughputThresholdPages);
        }
        int throughputThresholdSequence = conf.getInt("fetcher.throughput.threshold.sequence", 5);
        if (LOG.isInfoEnabled()) {
            LOG.info("Fetcher: throughput threshold sequence: " + throughputThresholdSequence);
        }
        long throughputThresholdTimeLimit = conf.getLong("fetcher.throughput.threshold.check.after", -1L);
        do {
            int hitByTimeLimit;
            float pagesLastSec = this.pages.get();
            int bytesLastSec = (int)this.bytes.get();
            int secondsToSleep = 5;
            try {
                Thread.sleep(5000L);
            }
            catch (InterruptedException e) {
                // empty catch block
            }
            pagesLastSec = ((float)this.pages.get() - pagesLastSec) / 5.0f;
            bytesLastSec = ((int)this.bytes.get() - bytesLastSec) / 5;
            int fetchQueuesTotalSize = this.fetchQueues.getTotalSize();
            this.reportAndLogStatus(context, pagesLastSec, bytesLastSec, fetchQueuesTotalSize);
            boolean feederAlive = this.feeder.isAlive();
            if (!feederAlive && fetchQueuesTotalSize < 5) {
                this.fetchQueues.dump();
            }
            if (!feederAlive && (hitByTimeLimit = this.fetchQueues.checkTimelimit()) != 0) {
                context.getCounter("FetcherStatus", "HitByTimeLimit-Queues").increment((long)hitByTimeLimit);
            }
            if (throughputThresholdTimeLimit < System.currentTimeMillis() && throughputThresholdPages != -1) {
                if (pagesLastSec < (float)throughputThresholdPages) {
                    LOG.warn(Integer.toString(++throughputThresholdCurrentSequence) + ": dropping below configured threshold of " + Integer.toString(throughputThresholdPages) + " pages per second");
                    if (throughputThresholdCurrentSequence > throughputThresholdSequence) {
                        LOG.warn("Dropped below threshold too many times in a row, killing!");
                        throughputThresholdPages = -1;
                        int hitByThrougputThreshold = this.fetchQueues.emptyQueues();
                        if (hitByThrougputThreshold != 0) {
                            context.getCounter("FetcherStatus", "hitByThrougputThreshold").increment((long)hitByThrougputThreshold);
                        }
                    }
                } else {
                    throughputThresholdCurrentSequence = 0;
                }
            }
            if (System.currentTimeMillis() - this.lastRequestStart.get() <= timeout) continue;
            if (LOG.isWarnEnabled() && this.activeThreads.get() > 0) {
                LOG.warn("Aborting with " + this.activeThreads + " hung threads.");
                for (int i = 0; i < this.fetcherThreads.size(); ++i) {
                    FetcherThread thread = this.fetcherThreads.get(i);
                    if (!thread.isAlive()) continue;
                    LOG.warn("Thread #" + i + " hung while processing " + thread.reprUrl);
                    if (!LOG.isDebugEnabled()) continue;
                    StackTraceElement[] stack = thread.getStackTrace();
                    StringBuilder sb = new StringBuilder();
                    sb.append("Stack of thread #").append(i).append(":\n");
                    for (StackTraceElement s : stack) {
                        sb.append(s.toString()).append('\n');
                    }
                    LOG.debug(sb.toString());
                }
            }
            return;
        } while (this.activeThreads.get() > 0);
        LOG.info("-activeThreads=" + this.activeThreads);
    }

    private static class QueueFeeder
    extends Thread {
        private final Reducer.Context context;
        private final FetchItemQueues queues;
        private final int size;
        private Iterator<FetchEntry> currentIter;
        boolean hasMore;
        private long timelimit = -1L;

        public QueueFeeder(Reducer.Context context, FetchItemQueues queues, int size) throws IOException, InterruptedException {
            this.context = context;
            this.queues = queues;
            this.size = size;
            this.setDaemon(true);
            this.setName("QueueFeeder");
            this.hasMore = context.nextKey();
            if (this.hasMore) {
                this.currentIter = context.getValues().iterator();
            }
            this.timelimit = context.getConfiguration().getLong("fetcher.timelimit", -1L);
        }

        @Override
        public void run() {
            int cnt = 0;
            int timelimitcount = 0;
            try {
                while (this.hasMore) {
                    if (System.currentTimeMillis() >= this.timelimit && this.timelimit != -1L) {
                        while (this.currentIter.hasNext()) {
                            this.currentIter.next();
                            ++timelimitcount;
                        }
                        this.hasMore = this.context.nextKey();
                        if (!this.hasMore) continue;
                        this.currentIter = this.context.getValues().iterator();
                        continue;
                    }
                    int feed = this.size - this.queues.getTotalSize();
                    if (feed <= 0) {
                        try {
                            Thread.sleep(1000L);
                        }
                        catch (Exception e) {}
                        continue;
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("-feeding " + feed + " input urls ...");
                    }
                    while (feed > 0 && this.currentIter.hasNext()) {
                        FetchEntry entry = this.currentIter.next();
                        String url = TableUtil.unreverseUrl(entry.getKey());
                        this.queues.addFetchItem(url, entry.getWebPage());
                        --feed;
                        ++cnt;
                    }
                    if (this.currentIter.hasNext()) continue;
                    this.hasMore = this.context.nextKey();
                    if (!this.hasMore) continue;
                    this.currentIter = this.context.getValues().iterator();
                }
            }
            catch (Exception e) {
                LOG.error("QueueFeeder error reading input, record " + cnt, (Throwable)e);
                return;
            }
            LOG.info("QueueFeeder finished: total " + cnt + " records. Hit by time limit :" + timelimitcount);
            this.context.getCounter("FetcherStatus", "HitByTimeLimit-QueueFeeder").increment((long)timelimitcount);
        }
    }

    private class FetcherThread
    extends Thread {
        private final URLFilters urlFilters;
        private final URLNormalizers normalizers;
        private final ProtocolFactory protocolFactory;
        private final long maxCrawlDelay;
        private final boolean byIP;
        private String reprUrl;
        private final Reducer.Context context;
        private final boolean ignoreExternalLinks;

        public FetcherThread(Reducer.Context context, int num) {
            this.setDaemon(true);
            this.setName("FetcherThread" + num);
            this.context = context;
            Configuration conf = context.getConfiguration();
            this.urlFilters = new URLFilters(conf);
            this.protocolFactory = new ProtocolFactory(conf);
            this.normalizers = new URLNormalizers(conf, "fetcher");
            this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000;
            this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", true);
            this.ignoreExternalLinks = conf.getBoolean("db.ignore.external.links", false);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            FetcherReducer.this.activeThreads.incrementAndGet();
            FetchItem fit = null;
            try {
                block19: while (true) {
                    if ((fit = FetcherReducer.this.fetchQueues.getFetchItem()) == null) {
                        if (!FetcherReducer.this.feeder.isAlive()) {
                            if (FetcherReducer.this.fetchQueues.getTotalSize() <= 0) return;
                        }
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(this.getName() + " fetchQueues.getFetchItem() was null, spin-waiting ...");
                        }
                        FetcherReducer.this.spinWaiting.incrementAndGet();
                        try {
                            Thread.sleep(500L);
                        }
                        catch (Exception e) {
                            // empty catch block
                        }
                        FetcherReducer.this.spinWaiting.decrementAndGet();
                        continue;
                    }
                    FetcherReducer.this.lastRequestStart.set(System.currentTimeMillis());
                    this.reprUrl = fit.page.getReprUrl() == null ? fit.url : TableUtil.toString(fit.page.getReprUrl());
                    try {
                        LOG.info("fetching " + fit.url + " (queue crawl delay=" + ((FetcherReducer)FetcherReducer.this).fetchQueues.getFetchItemQueue((String)fit.queueID).crawlDelay + "ms)");
                        Protocol protocol = this.protocolFactory.getProtocol(fit.url);
                        BaseRobotRules rules = protocol.getRobotRules(fit.url, fit.page);
                        if (!rules.isAllowed(fit.u.toString())) {
                            FetcherReducer.this.fetchQueues.finishFetchItem(fit, true);
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Denied by robots.txt: " + fit.url);
                            }
                            this.output(fit, null, ProtocolStatusUtils.STATUS_ROBOTS_DENIED, (byte)3);
                            continue;
                        }
                        if (rules.getCrawlDelay() > 0L) {
                            if (rules.getCrawlDelay() > this.maxCrawlDelay && this.maxCrawlDelay >= 0L) {
                                FetcherReducer.this.fetchQueues.finishFetchItem(fit, true);
                                LOG.debug("Crawl-Delay for " + fit.url + " too long (" + rules.getCrawlDelay() + "), skipping");
                                this.output(fit, null, ProtocolStatusUtils.STATUS_ROBOTS_DENIED, (byte)3);
                                continue;
                            }
                            FetchItemQueue fiq = FetcherReducer.this.fetchQueues.getFetchItemQueue(fit.queueID);
                            fiq.crawlDelay = rules.getCrawlDelay();
                            if (LOG.isDebugEnabled()) {
                                LOG.info("Crawl delay for queue: " + fit.queueID + " is set to " + fiq.crawlDelay + " as per robots.txt. url: " + fit.url);
                            }
                        }
                        ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.page);
                        ProtocolStatus status = output.getStatus();
                        Content content = output.getContent();
                        FetcherReducer.this.fetchQueues.finishFetchItem(fit);
                        this.context.getCounter("FetcherStatus", ProtocolStatusUtils.getName(status.getCode())).increment(1L);
                        int length = 0;
                        if (content != null && content.getContent() != null) {
                            length = content.getContent().length;
                        }
                        this.updateStatus(length);
                        switch (status.getCode()) {
                            case 22: {
                                FetcherReducer.this.fetchQueues.addFetchItem(fit);
                                continue block19;
                            }
                            case 1: {
                                this.output(fit, content, status, (byte)2);
                                continue block19;
                            }
                            case 12: 
                            case 13: {
                                boolean temp;
                                byte code;
                                if (status.getCode() == 12) {
                                    code = 5;
                                    temp = false;
                                } else {
                                    code = 4;
                                    temp = true;
                                }
                                String newUrl = ProtocolStatusUtils.getMessage(status);
                                this.handleRedirect(fit.url, newUrl, temp, "protocol", fit.page);
                                this.output(fit, content, status, code);
                                continue block19;
                            }
                            case 16: {
                                this.logFetchFailure(fit.url, ProtocolStatusUtils.getMessage(status));
                            }
                            case 15: 
                            case 23: {
                                this.output(fit, null, status, (byte)34);
                                continue block19;
                            }
                            case 11: 
                            case 14: 
                            case 17: 
                            case 18: {
                                this.output(fit, null, status, (byte)3);
                                continue block19;
                            }
                            case 21: {
                                this.output(fit, null, status, (byte)38);
                                continue block19;
                            }
                        }
                        if (LOG.isWarnEnabled()) {
                            LOG.warn("Unknown ProtocolStatus: " + status.getCode());
                        }
                        this.output(fit, null, status, (byte)34);
                    }
                    catch (Throwable t) {
                        FetcherReducer.this.fetchQueues.finishFetchItem(fit);
                        LOG.error("Unexpected error for " + fit.url, t);
                        this.output(fit, null, ProtocolStatusUtils.STATUS_FAILED, (byte)34);
                    }
                    continue;
                    break;
                }
            }
            catch (Throwable e) {
                LOG.error("fetcher throwable caught", e);
                return;
            }
            finally {
                if (fit != null) {
                    FetcherReducer.this.fetchQueues.finishFetchItem(fit);
                }
                FetcherReducer.this.activeThreads.decrementAndGet();
                LOG.info("-finishing thread " + this.getName() + ", activeThreads=" + FetcherReducer.this.activeThreads);
            }
        }

        private void handleRedirect(String url, String newUrl, boolean temp, String redirType, WebPage page) throws URLFilterException, IOException, InterruptedException {
            newUrl = this.normalizers.normalize(newUrl, "fetcher");
            if ((newUrl = this.urlFilters.filter(newUrl)) == null || newUrl.equals(url)) {
                return;
            }
            if (this.ignoreExternalLinks) {
                String toHost = new URL(newUrl).getHost().toLowerCase();
                String fromHost = new URL(url).getHost().toLowerCase();
                if (toHost == null || !toHost.equals(fromHost)) {
                    return;
                }
            }
            page.getOutlinks().put((CharSequence)new Utf8(newUrl), (CharSequence)new Utf8());
            page.getMetadata().put((CharSequence)FetcherJob.REDIRECT_DISCOVERED, TableUtil.YES_VAL);
            this.reprUrl = URLUtil.chooseRepr(this.reprUrl, newUrl, temp);
            if (this.reprUrl == null) {
                LOG.warn("reprUrl==null");
            } else {
                page.setReprUrl((CharSequence)new Utf8(this.reprUrl));
                if (LOG.isDebugEnabled()) {
                    LOG.debug(" - " + redirType + " redirect to " + this.reprUrl + " (fetching later)");
                }
            }
        }

        private void updateStatus(int bytesInPage) throws IOException {
            FetcherReducer.this.pages.incrementAndGet();
            FetcherReducer.this.bytes.addAndGet(bytesInPage);
        }

        private void output(FetchItem fit, Content content, ProtocolStatus pstatus, byte status) throws IOException, InterruptedException {
            fit.page.setStatus(Integer.valueOf(status));
            long prevFetchTime = fit.page.getFetchTime();
            fit.page.setPrevFetchTime(prevFetchTime);
            fit.page.setFetchTime(System.currentTimeMillis());
            if (pstatus != null) {
                fit.page.setProtocolStatus(pstatus);
            }
            if (content != null) {
                fit.page.setContent(ByteBuffer.wrap(content.getContent()));
                fit.page.setContentType((CharSequence)new Utf8(content.getContentType()));
                fit.page.setBaseUrl((CharSequence)new Utf8(content.getBaseUrl()));
            }
            Mark.FETCH_MARK.putMark(fit.page, Mark.GENERATE_MARK.checkMark(fit.page));
            String key = TableUtil.reverseUrl(fit.url);
            if (FetcherReducer.this.parse && (!FetcherReducer.this.skipTruncated || FetcherReducer.this.skipTruncated && !ParserJob.isTruncated(fit.url, fit.page))) {
                FetcherReducer.this.parseUtil.process(key, fit.page);
            }
            if (content != null && !FetcherReducer.this.storingContent) {
                fit.page.setContent(ByteBuffer.wrap(new byte[0]));
            }
            this.context.write((Object)key, (Object)fit.page);
        }

        private void logFetchFailure(String url, String message) {
            LOG.warn("fetch of " + url + " failed with: " + message);
            FetcherReducer.this.errors.incrementAndGet();
        }
    }

    private static class FetchItemQueues {
        public static final String DEFAULT_ID = "default";
        Map<String, FetchItemQueue> queues = new HashMap<String, FetchItemQueue>();
        AtomicInteger totalSize = new AtomicInteger(0);
        int maxThreads;
        String queueMode;
        long crawlDelay;
        long minCrawlDelay;
        Configuration conf;
        long timelimit = -1L;
        boolean useHostSettings = false;
        HostDb hostDb = null;
        public static final String QUEUE_MODE_HOST = "byHost";
        public static final String QUEUE_MODE_DOMAIN = "byDomain";
        public static final String QUEUE_MODE_IP = "byIP";

        public FetchItemQueues(Configuration conf) throws IOException {
            this.conf = conf;
            this.maxThreads = conf.getInt("fetcher.threads.per.queue", 1);
            this.queueMode = conf.get("fetcher.queue.mode", QUEUE_MODE_HOST);
            if (!(this.queueMode.equals(QUEUE_MODE_IP) || this.queueMode.equals(QUEUE_MODE_DOMAIN) || this.queueMode.equals(QUEUE_MODE_HOST))) {
                LOG.error("Unknown partition mode : " + this.queueMode + " - forcing to byHost");
                this.queueMode = QUEUE_MODE_HOST;
            }
            LOG.info("Using queue mode : " + this.queueMode);
            if (this.queueMode.equals(QUEUE_MODE_HOST)) {
                this.useHostSettings = conf.getBoolean("fetcher.queue.use.host.settings", false);
                if (this.useHostSettings) {
                    LOG.info("Host specific queue settings enabled.");
                    this.hostDb = new HostDb(conf);
                }
            }
            this.crawlDelay = (long)(conf.getFloat("fetcher.server.delay", 1.0f) * 1000.0f);
            this.minCrawlDelay = (long)(conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000.0f);
            this.timelimit = conf.getLong("fetcher.timelimit", -1L);
        }

        public int getTotalSize() {
            return this.totalSize.get();
        }

        public int getQueueCount() {
            return this.queues.size();
        }

        public void addFetchItem(String url, WebPage page) {
            FetchItem it = FetchItem.create(url, page, this.queueMode);
            if (it != null) {
                this.addFetchItem(it);
            }
        }

        public synchronized void addFetchItem(FetchItem it) {
            FetchItemQueue fiq = this.getFetchItemQueue(it.queueID);
            fiq.addFetchItem(it);
            this.totalSize.incrementAndGet();
        }

        public void finishFetchItem(FetchItem it) {
            this.finishFetchItem(it, false);
        }

        public void finishFetchItem(FetchItem it, boolean asap) {
            FetchItemQueue fiq = this.queues.get(it.queueID);
            if (fiq == null) {
                LOG.warn("Attempting to finish item from unknown queue: " + it);
                return;
            }
            fiq.finishFetchItem(it, asap);
        }

        public synchronized FetchItemQueue getFetchItemQueue(String id) {
            FetchItemQueue fiq = this.queues.get(id);
            if (fiq == null) {
                if (this.useHostSettings) {
                    try {
                        String hostname = id.substring(id.indexOf("://") + 3);
                        Host host = this.hostDb.getByHostName(hostname);
                        if (host != null) {
                            fiq = new FetchItemQueue(this.conf, host.getInt("q_mt", this.maxThreads), host.getLong("q_cd", this.crawlDelay), host.getLong("q_mcd", this.minCrawlDelay));
                        }
                    }
                    catch (IOException e) {
                        LOG.error("Error while trying to access host settings", (Throwable)e);
                    }
                }
                if (fiq == null) {
                    fiq = new FetchItemQueue(this.conf, this.maxThreads, this.crawlDelay, this.minCrawlDelay);
                }
                this.queues.put(id, fiq);
            }
            return fiq;
        }

        public synchronized FetchItem getFetchItem() {
            Iterator<Map.Entry<String, FetchItemQueue>> it = this.queues.entrySet().iterator();
            while (it.hasNext()) {
                FetchItemQueue fiq = it.next().getValue();
                if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) {
                    it.remove();
                    continue;
                }
                FetchItem fit = fiq.getFetchItem();
                if (fit == null) continue;
                this.totalSize.decrementAndGet();
                return fit;
            }
            return null;
        }

        public synchronized int checkTimelimit() {
            if (System.currentTimeMillis() >= this.timelimit && this.timelimit != -1L) {
                return this.emptyQueues();
            }
            return 0;
        }

        public synchronized void dump() {
            for (String id : this.queues.keySet()) {
                FetchItemQueue fiq = this.queues.get(id);
                if (fiq.getQueueSize() == 0) continue;
                LOG.info("* queue: " + id);
                fiq.dump();
            }
        }

        public synchronized int emptyQueues() {
            int count = 0;
            for (String id : this.queues.keySet()) {
                FetchItemQueue fiq = this.queues.get(id);
                if (fiq.getQueueSize() == 0) continue;
                LOG.info("* queue: " + id + " >> dropping! ");
                int deleted = fiq.emptyQueue();
                for (int i = 0; i < deleted; ++i) {
                    this.totalSize.decrementAndGet();
                }
                count += deleted;
            }
            if (this.totalSize.get() != 0 && this.queues.size() == 0) {
                this.totalSize.set(0);
            }
            return count;
        }
    }

    private static class FetchItemQueue {
        List<FetchItem> queue = Collections.synchronizedList(new LinkedList());
        Set<FetchItem> inProgress = Collections.synchronizedSet(new HashSet());
        AtomicLong nextFetchTime = new AtomicLong();
        long crawlDelay;
        long minCrawlDelay;
        int maxThreads;

        public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay, long minCrawlDelay) {
            this.maxThreads = maxThreads;
            this.crawlDelay = crawlDelay;
            this.minCrawlDelay = minCrawlDelay;
            this.setEndTime(System.currentTimeMillis() - crawlDelay);
        }

        public int getQueueSize() {
            return this.queue.size();
        }

        public int getInProgressSize() {
            return this.inProgress.size();
        }

        public void finishFetchItem(FetchItem it, boolean asap) {
            if (it != null) {
                this.inProgress.remove(it);
                this.setEndTime(System.currentTimeMillis(), asap);
            }
        }

        public void addFetchItem(FetchItem it) {
            if (it == null) {
                return;
            }
            this.queue.add(it);
        }

        public void addInProgressFetchItem(FetchItem it) {
            if (it == null) {
                return;
            }
            this.inProgress.add(it);
        }

        public FetchItem getFetchItem() {
            if (this.inProgress.size() >= this.maxThreads) {
                return null;
            }
            long now = System.currentTimeMillis();
            if (this.nextFetchTime.get() > now) {
                return null;
            }
            FetchItem it = null;
            if (this.queue.size() == 0) {
                return null;
            }
            try {
                it = this.queue.remove(0);
                this.inProgress.add(it);
            }
            catch (Exception e) {
                LOG.error("Cannot remove FetchItem from queue or cannot add it to inProgress queue", (Throwable)e);
            }
            return it;
        }

        public synchronized void dump() {
            LOG.info("  maxThreads    = " + this.maxThreads);
            LOG.info("  inProgress    = " + this.inProgress.size());
            LOG.info("  crawlDelay    = " + this.crawlDelay);
            LOG.info("  minCrawlDelay = " + this.minCrawlDelay);
            LOG.info("  nextFetchTime = " + this.nextFetchTime.get());
            LOG.info("  now           = " + System.currentTimeMillis());
            for (int i = 0; i < this.queue.size(); ++i) {
                FetchItem it = this.queue.get(i);
                LOG.info("  " + i + ". " + it.url);
            }
        }

        private void setEndTime(long endTime) {
            this.setEndTime(endTime, false);
        }

        private void setEndTime(long endTime, boolean asap) {
            if (!asap) {
                this.nextFetchTime.set(endTime + (this.maxThreads > 1 ? this.minCrawlDelay : this.crawlDelay));
            } else {
                this.nextFetchTime.set(endTime);
            }
        }

        public synchronized int emptyQueue() {
            int presize = this.queue.size();
            this.queue.clear();
            return presize;
        }
    }

    private static class FetchItem {
        WebPage page;
        String queueID;
        String url;
        URL u;

        public FetchItem(String url, WebPage page, URL u, String queueID) {
            this.page = page;
            this.url = url;
            this.u = u;
            this.queueID = queueID;
        }

        public static FetchItem create(String url, WebPage page, String queueMode) {
            String host;
            URL u = null;
            try {
                u = new URL(url);
            }
            catch (Exception e) {
                LOG.warn("Cannot parse url: " + url, (Throwable)e);
                return null;
            }
            String proto = u.getProtocol().toLowerCase();
            if ("byIP".equalsIgnoreCase(queueMode)) {
                try {
                    InetAddress addr = InetAddress.getByName(u.getHost());
                    host = addr.getHostAddress();
                }
                catch (UnknownHostException e) {
                    LOG.warn("Unable to resolve: " + u.getHost() + ", skipping.");
                    return null;
                }
            } else if ("byDomain".equalsIgnoreCase(queueMode)) {
                host = URLUtil.getDomainName(u);
                if (host == null) {
                    LOG.warn("Unknown domain for url: " + url + ", using URL string as key");
                    host = u.toExternalForm();
                }
            } else {
                host = u.getHost();
                if (host == null) {
                    LOG.warn("Unknown host for url: " + url + ", using URL string as key");
                    host = u.toExternalForm();
                }
            }
            String queueID = proto + "://" + host.toLowerCase();
            return new FetchItem(url, page, u, queueID);
        }

        public String toString() {
            return "FetchItem [queueID=" + this.queueID + ", url=" + this.url + ", u=" + this.u + ", page=" + (Object)((Object)this.page) + "]";
        }
    }
}

