/*
 * Decompiled with CFR 0.152.
 */
package com.findwise.hydra.stage;

import com.findwise.hydra.JsonException;
import com.findwise.hydra.local.LocalDocument;
import com.findwise.hydra.local.RemotePipeline;
import com.findwise.hydra.stage.AbstractStage;
import com.findwise.hydra.stage.Parameter;
import com.findwise.hydra.stage.ProcessException;
import java.io.IOException;
import java.util.Map;
import org.apache.http.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractProcessStage
extends AbstractStage {
    Logger logger = LoggerFactory.getLogger(AbstractProcessStage.class);
    @Parameter(description="If set, indicates that the document being processed should be FAILED if a ProcessException is thrown by the stage. If not set, the error will only be persisted and the document written back to Hydra.")
    private boolean failDocumentOnProcessException = false;
    public static final int NUM_RESERVED_ARGUMENTS = 3;
    private long holdInterval = 2000L;

    protected LocalDocument fetch() throws ParseException, IOException, JsonException {
        return this.getRemotePipeline().getDocument(this.getQuery());
    }

    protected boolean persist() throws IOException, JsonException {
        this.logger.debug("Saving document to RemotePipeline..");
        return this.getRemotePipeline().saveCurrentDocument();
    }

    protected boolean persistError(LocalDocument d, Exception e) throws IOException, JsonException {
        this.logger.error("Trying to release document due to error in processing", (Throwable)e);
        d.addError(this.getStageName(), e);
        return this.getRemotePipeline().saveCurrentDocument();
    }

    public abstract void process(LocalDocument var1) throws ProcessException;

    @Override
    public void setUp(RemotePipeline rp, Map<String, Object> properties) throws IllegalArgumentException, IllegalAccessException, IOException {
        super.setUp(rp, properties);
    }

    @Override
    public void run() {
        this.setContinueRunning(true);
        while (this.isContinueRunning()) {
            try {
                LocalDocument doc = this.fetch();
                if (doc == null) {
                    Thread.sleep(this.holdInterval);
                    continue;
                }
                try {
                    this.logger.debug("Got new doc " + doc.getID() + " to process.");
                    this.process(doc);
                    if (this.persist()) continue;
                    LocalDocument ld = new LocalDocument(doc.toJson());
                    IOException e = new IOException("Unable to save changes to core");
                    if (this.getRemotePipeline().markFailed(ld, e)) continue;
                    this.logger.error("Unable to persist an error to the database", (Throwable)e);
                }
                catch (ProcessException e) {
                    if (this.failDocumentOnProcessException) {
                        this.getRemotePipeline().markFailed(doc, e);
                        continue;
                    }
                    this.persistError(doc, e);
                }
            }
            catch (Exception e) {
                this.logger.error("Caught exception while running", (Throwable)e);
                Runtime.getRuntime().removeShutdownHook(this.getShutDownHook());
                System.exit(1);
            }
        }
    }
}

