/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nutch.crawl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.avro.util.Utf8;
import org.apache.gora.mapreduce.GoraOutputFormat;
import org.apache.gora.store.DataStore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.crawl.DbUpdaterJob;
import org.apache.nutch.net.URLFilters;
import org.apache.nutch.net.URLNormalizers;
import org.apache.nutch.scoring.ScoringFilterException;
import org.apache.nutch.scoring.ScoringFilters;
import org.apache.nutch.storage.Mark;
import org.apache.nutch.storage.StorageUtils;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.NutchTool;
import org.apache.nutch.util.TableUtil;
import org.apache.nutch.util.TimingUtil;
import org.apache.nutch.util.ToolUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InjectorJob
extends NutchTool
implements Tool {
    public static final Logger LOG = LoggerFactory.getLogger(InjectorJob.class);
    private static final Set<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
    private static final Utf8 YES_STRING = new Utf8("y");
    public static String nutchScoreMDName;
    public static String nutchFetchIntervalMDName;

    public InjectorJob() {
    }

    public InjectorJob(Configuration conf) {
        this.setConf(conf);
    }

    @Override
    public Map<String, Object> run(Map<String, Object> args) throws Exception {
        this.getConf().setLong("injector.current.time", System.currentTimeMillis());
        Object path = args.get("seedDir");
        Path input = path instanceof Path ? (Path)path : new Path(path.toString());
        this.numJobs = 1;
        this.currentJobNum = 0;
        this.currentJob = new NutchJob(this.getConf(), "inject " + input);
        FileInputFormat.addInputPath((Job)this.currentJob, (Path)input);
        this.currentJob.setMapperClass(UrlMapper.class);
        this.currentJob.setMapOutputKeyClass(String.class);
        this.currentJob.setMapOutputValueClass(WebPage.class);
        this.currentJob.setOutputFormatClass(GoraOutputFormat.class);
        DataStore<String, WebPage> store = StorageUtils.createWebStore(this.currentJob.getConfiguration(), String.class, WebPage.class);
        GoraOutputFormat.setOutput((Job)this.currentJob, store, (boolean)true);
        Class dataStoreClass = StorageUtils.getDataStoreClass(this.currentJob.getConfiguration());
        LOG.info("InjectorJob: Using " + dataStoreClass + " as the Gora storage class.");
        this.currentJob.setReducerClass(Reducer.class);
        this.currentJob.setNumReduceTasks(0);
        this.currentJob.waitForCompletion(true);
        ToolUtil.recordJobStatus(null, this.currentJob, this.results);
        long urlsInjected = this.currentJob.getCounters().findCounter("injector", "urls_injected").getValue();
        long urlsFiltered = this.currentJob.getCounters().findCounter("injector", "urls_filtered").getValue();
        LOG.info("InjectorJob: total number of urls rejected by filters: " + urlsFiltered);
        LOG.info("InjectorJob: total number of urls injected after normalization and filtering: " + urlsInjected);
        return this.results;
    }

    public void inject(Path urlDir) throws Exception {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        long start = System.currentTimeMillis();
        LOG.info("InjectorJob: starting at " + sdf.format(start));
        LOG.info("InjectorJob: Injecting urlDir: " + urlDir);
        this.run(ToolUtil.toArgMap("seedDir", urlDir));
        long end = System.currentTimeMillis();
        LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
    }

    public int run(String[] args) throws Exception {
        if (args.length < 1) {
            System.err.println("Usage: InjectorJob <url_dir> [-crawlId <id>]");
            return -1;
        }
        for (int i = 1; i < args.length; ++i) {
            if ("-crawlId".equals(args[i])) {
                this.getConf().set("storage.crawl.id", args[i + 1]);
                ++i;
                continue;
            }
            System.err.println("Unrecognized arg " + args[i]);
            return -1;
        }
        try {
            this.inject(new Path(args[0]));
            return 0;
        }
        catch (Exception e) {
            LOG.error("InjectorJob: " + StringUtils.stringifyException((Throwable)e));
            return -1;
        }
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run((Configuration)NutchConfiguration.create(), (Tool)new InjectorJob(), (String[])args);
        System.exit(res);
    }

    static {
        FIELDS.add(WebPage.Field.MARKERS);
        FIELDS.add(WebPage.Field.STATUS);
        nutchScoreMDName = "nutch.score";
        nutchFetchIntervalMDName = "nutch.fetchInterval";
    }

    public static class UrlMapper
    extends Mapper<LongWritable, Text, String, WebPage> {
        private URLNormalizers urlNormalizers;
        private int interval;
        private float scoreInjected;
        private URLFilters filters;
        private ScoringFilters scfilters;
        private long curTime;

        protected void setup(Mapper.Context context) throws IOException, InterruptedException {
            this.urlNormalizers = new URLNormalizers(context.getConfiguration(), "inject");
            this.interval = context.getConfiguration().getInt("db.fetch.interval.default", 2592000);
            this.filters = new URLFilters(context.getConfiguration());
            this.scfilters = new ScoringFilters(context.getConfiguration());
            this.scoreInjected = context.getConfiguration().getFloat("db.score.injected", 1.0f);
            this.curTime = context.getConfiguration().getLong("injector.current.time", System.currentTimeMillis());
        }

        protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
            WebPage row;
            String reversedUrl;
            block17: {
                String url = value.toString().trim();
                if (url != null && (url.length() == 0 || url.startsWith("#"))) {
                    return;
                }
                float customScore = -1.0f;
                int customInterval = this.interval;
                TreeMap<String, String> metadata = new TreeMap<String, String>();
                if (url.indexOf("\t") != -1) {
                    String[] splits = url.split("\t");
                    url = splits[0];
                    for (int s = 1; s < splits.length; ++s) {
                        int indexEquals = splits[s].indexOf("=");
                        if (indexEquals == -1) continue;
                        String metaname = splits[s].substring(0, indexEquals);
                        String metavalue = splits[s].substring(indexEquals + 1);
                        if (metaname.equals(nutchScoreMDName)) {
                            try {
                                customScore = Float.parseFloat(metavalue);
                            }
                            catch (NumberFormatException nfe) {}
                            continue;
                        }
                        if (metaname.equals(nutchFetchIntervalMDName)) {
                            try {
                                customInterval = Integer.parseInt(metavalue);
                            }
                            catch (NumberFormatException nfe) {}
                            continue;
                        }
                        metadata.put(metaname, metavalue);
                    }
                }
                try {
                    url = this.urlNormalizers.normalize(url, "inject");
                    url = this.filters.filter(url);
                }
                catch (Exception e) {
                    LOG.warn("Skipping " + url + ":" + e);
                    url = null;
                }
                if (url == null) {
                    context.getCounter("injector", "urls_filtered").increment(1L);
                    return;
                }
                reversedUrl = TableUtil.reverseUrl(url);
                row = WebPage.newBuilder().build();
                row.setFetchTime(this.curTime);
                row.setFetchInterval(customInterval);
                for (String keymd : metadata.keySet()) {
                    String valuemd = (String)metadata.get(keymd);
                    row.getMetadata().put((CharSequence)new Utf8(keymd), ByteBuffer.wrap(valuemd.getBytes()));
                }
                if (customScore != -1.0f) {
                    row.setScore(Float.valueOf(customScore));
                } else {
                    row.setScore(Float.valueOf(this.scoreInjected));
                }
                try {
                    this.scfilters.injectedScore(url, row);
                }
                catch (ScoringFilterException e) {
                    if (!LOG.isWarnEnabled()) break block17;
                    LOG.warn("Cannot filter injected score for url " + url + ", using default (" + e.getMessage() + ")");
                }
            }
            context.getCounter("injector", "urls_injected").increment(1L);
            row.getMarkers().put((CharSequence)DbUpdaterJob.DISTANCE, (CharSequence)new Utf8(String.valueOf(0)));
            Mark.INJECT_MARK.putMark(row, YES_STRING);
            context.write((Object)reversedUrl, (Object)row);
        }
    }
}

