/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nutch.parse;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import org.apache.avro.util.Utf8;
import org.apache.gora.filter.FilterOp;
import org.apache.gora.filter.MapFieldValueFilter;
import org.apache.gora.mapreduce.GoraMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.crawl.SignatureFactory;
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.parse.ParseFilters;
import org.apache.nutch.parse.ParseStatusCodes;
import org.apache.nutch.parse.ParseUtil;
import org.apache.nutch.parse.ParserFactory;
import org.apache.nutch.storage.Mark;
import org.apache.nutch.storage.ParseStatus;
import org.apache.nutch.storage.StorageUtils;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.IdentityPageReducer;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.NutchTool;
import org.apache.nutch.util.StringUtil;
import org.apache.nutch.util.TableUtil;
import org.apache.nutch.util.TimingUtil;
import org.apache.nutch.util.ToolUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParserJob
extends NutchTool
implements Tool {
    public static final Logger LOG = LoggerFactory.getLogger(ParserJob.class);
    private static final String RESUME_KEY = "parse.job.resume";
    private static final String FORCE_KEY = "parse.job.force";
    public static final String SKIP_TRUNCATED = "parser.skip.truncated";
    private static final Utf8 REPARSE = new Utf8("-reparse");
    private static final Collection<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
    private Configuration conf;

    public ParserJob() {
    }

    public ParserJob(Configuration conf) {
        this.setConf(conf);
    }

    public static boolean isTruncated(String url, WebPage page) {
        int inHeaderSize;
        ByteBuffer content = page.getContent();
        if (content == null) {
            return false;
        }
        CharSequence lengthUtf8 = page.getHeaders().get(new Utf8("Content-Length"));
        if (lengthUtf8 == null) {
            return false;
        }
        String lengthStr = lengthUtf8.toString().trim();
        if (StringUtil.isEmpty(lengthStr)) {
            return false;
        }
        try {
            inHeaderSize = Integer.parseInt(lengthStr);
        }
        catch (NumberFormatException e) {
            LOG.warn("Wrong contentlength format for " + url, (Throwable)e);
            return false;
        }
        int actualSize = content.limit();
        if (inHeaderSize > actualSize) {
            LOG.warn(url + " skipped. Content of size " + inHeaderSize + " was truncated to " + actualSize);
            return true;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug(url + " actualSize=" + actualSize + " inHeaderSize=" + inHeaderSize);
        }
        return false;
    }

    public Collection<WebPage.Field> getFields(Job job) {
        Configuration conf = job.getConfiguration();
        HashSet<WebPage.Field> fields = new HashSet<WebPage.Field>(FIELDS);
        ParserFactory parserFactory = new ParserFactory(conf);
        ParseFilters parseFilters = new ParseFilters(conf);
        Collection<WebPage.Field> parsePluginFields = parserFactory.getFields();
        Collection<WebPage.Field> signaturePluginFields = SignatureFactory.getFields(conf);
        Collection<WebPage.Field> htmlParsePluginFields = parseFilters.getFields();
        if (parsePluginFields != null) {
            fields.addAll(parsePluginFields);
        }
        if (signaturePluginFields != null) {
            fields.addAll(signaturePluginFields);
        }
        if (htmlParsePluginFields != null) {
            fields.addAll(htmlParsePluginFields);
        }
        return fields;
    }

    public Configuration getConf() {
        return this.conf;
    }

    public void setConf(Configuration conf) {
        this.conf = conf;
    }

    @Override
    public Map<String, Object> run(Map<String, Object> args) throws Exception {
        String batchId = (String)args.get("batch");
        Boolean shouldResume = (Boolean)args.get("resume");
        Boolean force = (Boolean)args.get("force");
        if (batchId != null) {
            this.getConf().set("generate.batch.id", batchId);
        }
        if (shouldResume != null) {
            this.getConf().setBoolean(RESUME_KEY, shouldResume.booleanValue());
        }
        if (force != null) {
            this.getConf().setBoolean(FORCE_KEY, force.booleanValue());
        }
        LOG.info("ParserJob: resuming:\t" + this.getConf().getBoolean(RESUME_KEY, false));
        LOG.info("ParserJob: forced reparse:\t" + this.getConf().getBoolean(FORCE_KEY, false));
        if (batchId == null || batchId.equals("-all")) {
            LOG.info("ParserJob: parsing all");
        } else {
            LOG.info("ParserJob: batchId:\t" + batchId);
        }
        this.currentJob = new NutchJob(this.getConf(), "parse");
        Collection<WebPage.Field> fields = this.getFields(this.currentJob);
        MapFieldValueFilter<String, WebPage> batchIdFilter = this.getBatchIdFilter(batchId);
        StorageUtils.initMapperJob(this.currentJob, fields, String.class, WebPage.class, ParserMapper.class, batchIdFilter);
        StorageUtils.initReducerJob(this.currentJob, IdentityPageReducer.class);
        this.currentJob.setNumReduceTasks(0);
        this.currentJob.waitForCompletion(true);
        ToolUtil.recordJobStatus(null, this.currentJob, this.results);
        return this.results;
    }

    private MapFieldValueFilter<String, WebPage> getBatchIdFilter(String batchId) {
        if (batchId.equals(REPARSE.toString()) || batchId.equals(Nutch.ALL_CRAWL_ID.toString())) {
            return null;
        }
        MapFieldValueFilter filter = new MapFieldValueFilter();
        filter.setFieldName(WebPage.Field.MARKERS.toString());
        filter.setFilterOp(FilterOp.EQUALS);
        filter.setFilterIfMissing(true);
        filter.setMapKey(Mark.FETCH_MARK.getName());
        filter.getOperands().add(new Utf8(batchId));
        return filter;
    }

    public int parse(String batchId, boolean shouldResume, boolean force) throws Exception {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        long start = System.currentTimeMillis();
        LOG.info("ParserJob: starting at " + sdf.format(start));
        this.run(ToolUtil.toArgMap("batch", batchId, "resume", shouldResume, "force", force));
        LOG.info("ParserJob: success");
        long finish = System.currentTimeMillis();
        LOG.info("ParserJob: finished at " + sdf.format(finish) + ", time elapsed: " + TimingUtil.elapsedTime(start, finish));
        return 0;
    }

    public int run(String[] args) throws Exception {
        boolean shouldResume = false;
        boolean force = false;
        String batchId = null;
        if (args.length < 1) {
            System.err.println("Usage: ParserJob (<batchId> | -all) [-crawlId <id>] [-resume] [-force]");
            System.err.println("    <batchId>     - symbolic batch ID created by Generator");
            System.err.println("    -crawlId <id> - the id to prefix the schemas to operate on, \n \t \t    (default: storage.crawl.id)");
            System.err.println("    -all          - consider pages from all crawl jobs");
            System.err.println("    -resume       - resume a previous incomplete job");
            System.err.println("    -force        - force re-parsing even if a page is already parsed");
            return -1;
        }
        for (int i = 0; i < args.length; ++i) {
            if ("-resume".equals(args[i])) {
                shouldResume = true;
                continue;
            }
            if ("-force".equals(args[i])) {
                force = true;
                continue;
            }
            if ("-crawlId".equals(args[i])) {
                this.getConf().set("storage.crawl.id", args[++i]);
                continue;
            }
            if ("-all".equals(args[i])) {
                batchId = args[i];
                continue;
            }
            if (batchId != null) {
                System.err.println("BatchId already set to '" + batchId + "'!");
                return -1;
            }
            batchId = args[i];
        }
        if (batchId == null) {
            System.err.println("BatchId not set (or -all/-reparse not specified)!");
            return -1;
        }
        return this.parse(batchId, shouldResume, force);
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run((Configuration)NutchConfiguration.create(), (Tool)new ParserJob(), (String[])args);
        System.exit(res);
    }

    static {
        FIELDS.add(WebPage.Field.STATUS);
        FIELDS.add(WebPage.Field.CONTENT);
        FIELDS.add(WebPage.Field.CONTENT_TYPE);
        FIELDS.add(WebPage.Field.SIGNATURE);
        FIELDS.add(WebPage.Field.MARKERS);
        FIELDS.add(WebPage.Field.PARSE_STATUS);
        FIELDS.add(WebPage.Field.OUTLINKS);
        FIELDS.add(WebPage.Field.METADATA);
        FIELDS.add(WebPage.Field.HEADERS);
    }

    public static class ParserMapper
    extends GoraMapper<String, WebPage, String, WebPage> {
        private ParseUtil parseUtil;
        private boolean shouldResume;
        private boolean force;
        private Utf8 batchId;
        private boolean skipTruncated;

        public void setup(Mapper.Context context) throws IOException {
            Configuration conf = context.getConfiguration();
            this.parseUtil = new ParseUtil(conf);
            this.shouldResume = conf.getBoolean(ParserJob.RESUME_KEY, false);
            this.force = conf.getBoolean(ParserJob.FORCE_KEY, false);
            this.batchId = new Utf8(conf.get("generate.batch.id", "-all"));
            this.skipTruncated = conf.getBoolean(ParserJob.SKIP_TRUNCATED, true);
        }

        /*
         * Enabled aggressive block sorting
         */
        public void map(String key, WebPage page, Mapper.Context context) throws IOException, InterruptedException {
            String unreverseKey = TableUtil.unreverseUrl(key);
            if (this.batchId.equals((Object)REPARSE)) {
                LOG.debug("Reparsing " + unreverseKey);
            } else {
                if (Mark.FETCH_MARK.checkMark(page) == null) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; not fetched yet");
                    }
                    return;
                }
                if (this.shouldResume && Mark.PARSE_MARK.checkMark(page) != null) {
                    if (!this.force) {
                        LOG.info("Skipping " + unreverseKey + "; already parsed");
                        return;
                    }
                    LOG.info("Forced parsing " + unreverseKey + "; already parsed");
                } else {
                    LOG.info("Parsing " + unreverseKey);
                }
            }
            if (this.skipTruncated && ParserJob.isTruncated(unreverseKey, page)) {
                return;
            }
            this.parseUtil.process(key, page);
            ParseStatus pstatus = page.getParseStatus();
            if (pstatus != null) {
                context.getCounter("ParserStatus", ParseStatusCodes.majorCodes[pstatus.getMajorCode()]).increment(1L);
            }
            context.write((Object)key, (Object)page);
        }
    }
}

