/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nutch.fetcher;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import org.apache.avro.util.Utf8;
import org.apache.gora.filter.FilterOp;
import org.apache.gora.filter.MapFieldValueFilter;
import org.apache.gora.mapreduce.GoraMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.crawl.URLPartitioner;
import org.apache.nutch.fetcher.FetchEntry;
import org.apache.nutch.fetcher.FetcherReducer;
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.parse.ParserJob;
import org.apache.nutch.protocol.ProtocolFactory;
import org.apache.nutch.storage.Mark;
import org.apache.nutch.storage.StorageUtils;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.NutchTool;
import org.apache.nutch.util.TableUtil;
import org.apache.nutch.util.TimingUtil;
import org.apache.nutch.util.ToolUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FetcherJob
extends NutchTool
implements Tool {
    public static final String PROTOCOL_REDIR = "protocol";
    public static final int PERM_REFRESH_TIME = 5;
    public static final Utf8 REDIRECT_DISCOVERED = new Utf8("___rdrdsc__");
    public static final String RESUME_KEY = "fetcher.job.resume";
    public static final String PARSE_KEY = "fetcher.parse";
    public static final String THREADS_KEY = "fetcher.threads.fetch";
    private static final Collection<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
    public static final Logger LOG;

    public FetcherJob() {
    }

    public FetcherJob(Configuration conf) {
        this.setConf(conf);
    }

    public Collection<WebPage.Field> getFields(Job job) {
        HashSet<WebPage.Field> fields = new HashSet<WebPage.Field>(FIELDS);
        if (job.getConfiguration().getBoolean(PARSE_KEY, false)) {
            ParserJob parserJob = new ParserJob();
            fields.addAll(parserJob.getFields(job));
        }
        ProtocolFactory protocolFactory = new ProtocolFactory(job.getConfiguration());
        fields.addAll(protocolFactory.getFields());
        return fields;
    }

    @Override
    public Map<String, Object> run(Map<String, Object> args) throws Exception {
        this.checkConfiguration();
        String batchId = (String)args.get("batch");
        Integer threads = (Integer)args.get("threads");
        Boolean shouldResume = (Boolean)args.get("resume");
        Integer numTasks = (Integer)args.get("numTasks");
        if (threads != null && threads > 0) {
            this.getConf().setInt(THREADS_KEY, threads.intValue());
        }
        if (batchId == null) {
            batchId = "-all";
        }
        this.getConf().set("generate.batch.id", batchId);
        if (shouldResume != null) {
            this.getConf().setBoolean(RESUME_KEY, shouldResume.booleanValue());
        }
        LOG.info("FetcherJob: threads: " + this.getConf().getInt(THREADS_KEY, 10));
        LOG.info("FetcherJob: parsing: " + this.getConf().getBoolean(PARSE_KEY, false));
        LOG.info("FetcherJob: resuming: " + this.getConf().getBoolean(RESUME_KEY, false));
        long timelimit = this.getConf().getLong("fetcher.timelimit.mins", -1L);
        if (timelimit != -1L) {
            timelimit = System.currentTimeMillis() + timelimit * 60L * 1000L;
            this.getConf().setLong("fetcher.timelimit", timelimit);
        }
        LOG.info("FetcherJob : timelimit set for : " + this.getConf().getLong("fetcher.timelimit", -1L));
        this.numJobs = 1;
        this.currentJob = new NutchJob(this.getConf(), "fetch");
        this.currentJob.setReduceSpeculativeExecution(false);
        Collection<WebPage.Field> fields = this.getFields(this.currentJob);
        MapFieldValueFilter<String, WebPage> batchIdFilter = this.getBatchIdFilter(batchId);
        StorageUtils.initMapperJob(this.currentJob, fields, IntWritable.class, FetchEntry.class, FetcherMapper.class, URLPartitioner.FetchEntryPartitioner.class, batchIdFilter, false);
        StorageUtils.initReducerJob(this.currentJob, FetcherReducer.class);
        if (numTasks == null || numTasks < 1) {
            this.currentJob.setNumReduceTasks(this.currentJob.getConfiguration().getInt("mapred.map.tasks", this.currentJob.getNumReduceTasks()));
        } else {
            this.currentJob.setNumReduceTasks(numTasks.intValue());
        }
        this.currentJob.waitForCompletion(true);
        ToolUtil.recordJobStatus(null, this.currentJob, this.results);
        return this.results;
    }

    private MapFieldValueFilter<String, WebPage> getBatchIdFilter(String batchId) {
        if (batchId.equals(Nutch.ALL_CRAWL_ID.toString())) {
            return null;
        }
        MapFieldValueFilter filter = new MapFieldValueFilter();
        filter.setFieldName(WebPage.Field.MARKERS.toString());
        filter.setFilterOp(FilterOp.EQUALS);
        filter.setFilterIfMissing(true);
        filter.setMapKey(Mark.GENERATE_MARK.getName());
        filter.getOperands().add(new Utf8(batchId));
        return filter;
    }

    public int fetch(String batchId, int threads, boolean shouldResume, int numTasks) throws Exception {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        long start = System.currentTimeMillis();
        LOG.info("FetcherJob: starting at " + sdf.format(start));
        if (batchId.equals("-all")) {
            LOG.info("FetcherJob: fetching all");
        } else {
            LOG.info("FetcherJob: batchId: " + batchId);
        }
        this.run(ToolUtil.toArgMap("batch", batchId, "threads", threads, "resume", shouldResume, "numTasks", numTasks));
        long finish = System.currentTimeMillis();
        LOG.info("FetcherJob: finished at " + sdf.format(finish) + ", time elapsed: " + TimingUtil.elapsedTime(start, finish));
        return 0;
    }

    void checkConfiguration() {
        String agentName = this.getConf().get("http.agent.name");
        if (agentName == null || agentName.trim().length() == 0) {
            String message = "Fetcher: No agents listed in 'http.agent.name' property.";
            if (LOG.isErrorEnabled()) {
                LOG.error(message);
            }
            throw new IllegalArgumentException(message);
        }
    }

    public int run(String[] args) throws Exception {
        int threads = -1;
        boolean shouldResume = false;
        String usage = "Usage: FetcherJob (<batchId> | -all) [-crawlId <id>] [-threads N] \n \t \t  [-resume] [-numTasks N]\n    <batchId>     - crawl identifier returned by Generator, or -all for all \n \t \t    generated batchId-s\n    -crawlId <id> - the id to prefix the schemas to operate on, \n \t \t    (default: storage.crawl.id)\n    -threads N    - number of fetching threads per task\n    -resume       - resume interrupted job\n    -numTasks N   - if N > 0 then use this many reduce tasks for fetching \n \t \t    (default: mapred.map.tasks)";
        if (args.length == 0) {
            System.err.println(usage);
            return -1;
        }
        String batchId = args[0];
        if (!batchId.equals("-all") && batchId.startsWith("-")) {
            System.err.println(usage);
            return -1;
        }
        int numTasks = -1;
        for (int i = 1; i < args.length; ++i) {
            if ("-threads".equals(args[i])) {
                threads = Integer.parseInt(args[++i]);
                continue;
            }
            if ("-resume".equals(args[i])) {
                shouldResume = true;
                continue;
            }
            if ("-numTasks".equals(args[i])) {
                numTasks = Integer.parseInt(args[++i]);
                continue;
            }
            if ("-crawlId".equals(args[i])) {
                this.getConf().set("storage.crawl.id", args[++i]);
                continue;
            }
            throw new IllegalArgumentException("arg " + args[i] + " not recognized");
        }
        int fetchcode = this.fetch(batchId, threads, shouldResume, numTasks);
        return fetchcode;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run((Configuration)NutchConfiguration.create(), (Tool)new FetcherJob(), (String[])args);
        System.exit(res);
    }

    static {
        FIELDS.add(WebPage.Field.MARKERS);
        FIELDS.add(WebPage.Field.REPR_URL);
        FIELDS.add(WebPage.Field.FETCH_TIME);
        LOG = LoggerFactory.getLogger(FetcherJob.class);
    }

    public static class FetcherMapper
    extends GoraMapper<String, WebPage, IntWritable, FetchEntry> {
        private boolean shouldContinue;
        private Utf8 batchId;
        private Random random = new Random();

        protected void setup(Mapper.Context context) {
            Configuration conf = context.getConfiguration();
            this.shouldContinue = conf.getBoolean(FetcherJob.RESUME_KEY, false);
            this.batchId = new Utf8(conf.get("generate.batch.id", "-all"));
        }

        protected void map(String key, WebPage page, Mapper.Context context) throws IOException, InterruptedException {
            if (Mark.GENERATE_MARK.checkMark(page) == null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; not generated yet");
                }
                return;
            }
            if (this.shouldContinue && Mark.FETCH_MARK.checkMark(page) != null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; already fetched");
                }
                return;
            }
            context.write((Object)new IntWritable(this.random.nextInt(65536)), (Object)new FetchEntry(context.getConfiguration(), key, page));
        }
    }
}

