/*
 * Decompiled with CFR 0.152.
 */
package org.apache.solr.update;

import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.http.HttpResponse;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.Diagnostics;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.StreamingSolrServers;
import org.apache.solr.update.UpdateShardHandler;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SolrCmdDistributor {
    private static final int MAX_RETRIES_ON_FORWARD = 25;
    public static Logger log = LoggerFactory.getLogger(SolrCmdDistributor.class);
    private StreamingSolrServers servers;
    private int retryPause = 500;
    private int maxRetriesOnForward = 25;
    private final List<Error> allErrors = new ArrayList<Error>();
    private final List<Error> errors = Collections.synchronizedList(new ArrayList());
    private final ExecutorService updateExecutor;
    private final CompletionService<Object> completionService;
    private final Set<Future<Object>> pending = new HashSet<Future<Object>>();
    public static Diagnostics.Callable testing_errorHook;

    public SolrCmdDistributor(UpdateShardHandler updateShardHandler) {
        this.servers = new StreamingSolrServers(updateShardHandler);
        this.updateExecutor = updateShardHandler.getUpdateExecutor();
        this.completionService = new ExecutorCompletionService<Object>(this.updateExecutor);
    }

    public SolrCmdDistributor(StreamingSolrServers servers, int maxRetriesOnForward, int retryPause) {
        this.servers = servers;
        this.maxRetriesOnForward = maxRetriesOnForward;
        this.retryPause = retryPause;
        this.updateExecutor = servers.getUpdateExecutor();
        this.completionService = new ExecutorCompletionService<Object>(this.updateExecutor);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void finish() {
        try {
            this.blockAndDoRetries();
        }
        finally {
            this.servers.shutdown();
        }
    }

    private void doRetriesIfNeeded() {
        ArrayList<Error> errors = new ArrayList<Error>(this.errors);
        errors.addAll(this.servers.getErrors());
        ArrayList<Error> resubmitList = new ArrayList<Error>();
        for (Error err : errors) {
            try {
                String oldNodeUrl = err.req.node.getUrl();
                boolean isRetry = err.req.node.checkRetry();
                boolean doRetry = false;
                int rspCode = err.statusCode;
                if (testing_errorHook != null) {
                    Diagnostics.call(testing_errorHook, err.e);
                }
                if (isRetry) {
                    if (rspCode == 404 || rspCode == 403 || rspCode == 503) {
                        doRetry = true;
                    }
                    if (err.e instanceof SolrServerException && ((SolrServerException)err.e).getRootCause() instanceof ConnectException) {
                        doRetry = true;
                    }
                    if (err.e instanceof ConnectException) {
                        doRetry = true;
                    }
                    if (err.req.retries < this.maxRetriesOnForward && doRetry) {
                        ++err.req.retries;
                        SolrException.log(log, "forwarding update to " + oldNodeUrl + " failed - retrying ... retries: " + err.req.retries + " " + err.req.cmdString + " params:" + err.req.uReq.getParams() + " rsp:" + rspCode, err.e);
                        try {
                            Thread.sleep(this.retryPause);
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            log.warn(null, e);
                        }
                        resubmitList.add(err);
                        continue;
                    }
                    this.allErrors.add(err);
                    continue;
                }
                this.allErrors.add(err);
            }
            catch (Exception e) {
                log.error("Unexpected Error while doing request retries", e);
            }
        }
        this.servers.clearErrors();
        this.errors.clear();
        for (Error err : resubmitList) {
            this.submit(err.req, false);
        }
        if (resubmitList.size() > 0) {
            this.blockAndDoRetries();
        }
    }

    public void distribDelete(DeleteUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
        this.distribDelete(cmd, nodes, params, false);
    }

    public void distribDelete(DeleteUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean sync) throws IOException {
        for (Node node : nodes) {
            UpdateRequest uReq = new UpdateRequest();
            uReq.setParams(params);
            if (cmd.isDeleteById()) {
                uReq.deleteById(cmd.getId(), cmd.getVersion());
            } else {
                uReq.deleteByQuery(cmd.query);
            }
            this.submit(new Req(cmd.toString(), node, uReq, sync), false);
        }
    }

    public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
        this.distribAdd(cmd, nodes, params, false, null);
    }

    public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean synchronous) throws IOException {
        this.distribAdd(cmd, nodes, params, synchronous, null);
    }

    public void distribAdd(AddUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params, boolean synchronous, DistributedUpdateProcessor.RequestReplicationTracker rrt) throws IOException {
        for (Node node : nodes) {
            UpdateRequest uReq = new UpdateRequest();
            uReq.setParams(params);
            uReq.add(cmd.solrDoc, cmd.commitWithin, cmd.overwrite);
            this.submit(new Req(cmd.toString(), node, uReq, synchronous, rrt), false);
        }
    }

    public void distribCommit(CommitUpdateCommand cmd, List<Node> nodes, ModifiableSolrParams params) throws IOException {
        this.blockAndDoRetries();
        UpdateRequest uReq = new UpdateRequest();
        uReq.setParams(params);
        this.addCommit(uReq, cmd);
        log.debug("Distrib commit to: {} params: {}", (Object)nodes, (Object)params);
        for (Node node : nodes) {
            this.submit(new Req(cmd.toString(), node, uReq, false), true);
        }
    }

    private void blockAndDoRetries() {
        this.servers.blockUntilFinished();
        while (this.pending != null && this.pending.size() > 0) {
            Future<Object> future = null;
            try {
                future = this.completionService.take();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error("blockAndDoRetries interrupted", e);
            }
            if (future == null) break;
            this.pending.remove(future);
        }
        this.doRetriesIfNeeded();
    }

    void addCommit(UpdateRequest ureq, CommitUpdateCommand cmd) {
        if (cmd == null) {
            return;
        }
        ureq.setAction(cmd.optimize ? AbstractUpdateRequest.ACTION.OPTIMIZE : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher, cmd.maxOptimizeSegments, cmd.softCommit, cmd.expungeDeletes, cmd.openSearcher);
    }

    private void submit(final Req req, boolean isCommit) {
        if (req.synchronous) {
            this.blockAndDoRetries();
            HttpSolrServer server = new HttpSolrServer(req.node.getUrl(), this.servers.getHttpClient());
            try {
                server.request(req.uReq);
            }
            catch (Exception e) {
                throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed synchronous update on shard " + req.node + " update: " + req.uReq, (Throwable)e);
            }
            finally {
                server.shutdown();
            }
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("sending update to " + req.node.getUrl() + " retry:" + req.retries + " " + req.cmdString + " params:" + req.uReq.getParams());
        }
        if (isCommit) {
            this.pending.add(this.completionService.submit(new Callable<Object>(){

                @Override
                public Object call() throws Exception {
                    SolrCmdDistributor.this.doRequest(req);
                    return null;
                }
            }));
        } else {
            this.doRequest(req);
        }
    }

    private void doRequest(Req req) {
        try {
            SolrServer solrServer = this.servers.getSolrServer(req);
            solrServer.request(req.uReq);
        }
        catch (Exception e) {
            SolrException.log(log, e);
            Error error = new Error();
            error.e = e;
            error.req = req;
            if (e instanceof SolrException) {
                error.statusCode = ((SolrException)e).code();
            }
            this.errors.add(error);
        }
    }

    public List<Error> getErrors() {
        return this.allErrors;
    }

    public static class RetryNode
    extends StdNode {
        private ZkStateReader zkStateReader;

        public RetryNode(ZkCoreNodeProps nodeProps, ZkStateReader zkStateReader, String collection, String shardId) {
            super(nodeProps, collection, shardId);
            this.zkStateReader = zkStateReader;
            this.collection = collection;
            this.shardId = shardId;
        }

        @Override
        public boolean checkRetry() {
            ZkCoreNodeProps leaderProps;
            try {
                leaderProps = new ZkCoreNodeProps(this.zkStateReader.getLeaderRetry(this.collection, this.shardId));
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
            catch (Exception e) {
                log.warn(null, e);
                return true;
            }
            this.nodeProps = leaderProps;
            return true;
        }

        @Override
        public int hashCode() {
            int prime = 31;
            int result = super.hashCode();
            result = 31 * result + (this.collection == null ? 0 : this.collection.hashCode());
            result = 31 * result + (this.shardId == null ? 0 : this.shardId.hashCode());
            return result;
        }

        @Override
        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!super.equals(obj)) {
                return false;
            }
            if (this.getClass() != obj.getClass()) {
                return false;
            }
            RetryNode other = (RetryNode)obj;
            return !(this.nodeProps.getCoreUrl() == null ? other.nodeProps.getCoreUrl() != null : !this.nodeProps.getCoreUrl().equals(other.nodeProps.getCoreUrl()));
        }
    }

    public static class StdNode
    extends Node {
        protected ZkCoreNodeProps nodeProps;
        protected String collection;
        protected String shardId;

        public StdNode(ZkCoreNodeProps nodeProps) {
            this(nodeProps, null, null);
        }

        public StdNode(ZkCoreNodeProps nodeProps, String collection, String shardId) {
            this.nodeProps = nodeProps;
            this.collection = collection;
            this.shardId = shardId;
        }

        @Override
        public String getCollection() {
            return this.collection;
        }

        @Override
        public String getShardId() {
            return this.shardId;
        }

        @Override
        public String getUrl() {
            return this.nodeProps.getCoreUrl();
        }

        public String toString() {
            return this.getClass().getSimpleName() + ": " + this.nodeProps.getCoreUrl();
        }

        @Override
        public boolean checkRetry() {
            return false;
        }

        @Override
        public String getBaseUrl() {
            return this.nodeProps.getBaseUrl();
        }

        @Override
        public String getCoreName() {
            return this.nodeProps.getCoreName();
        }

        public int hashCode() {
            int prime = 31;
            int result = 1;
            String baseUrl = this.nodeProps.getBaseUrl();
            String coreName = this.nodeProps.getCoreName();
            String url = this.nodeProps.getCoreUrl();
            result = 31 * result + (baseUrl == null ? 0 : baseUrl.hashCode());
            result = 31 * result + (coreName == null ? 0 : coreName.hashCode());
            result = 31 * result + (url == null ? 0 : url.hashCode());
            return result;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (this.getClass() != obj.getClass()) {
                return false;
            }
            StdNode other = (StdNode)obj;
            String baseUrl = this.nodeProps.getBaseUrl();
            String coreName = this.nodeProps.getCoreName();
            String url = this.nodeProps.getCoreUrl();
            if (baseUrl == null ? other.nodeProps.getBaseUrl() != null : !baseUrl.equals(other.nodeProps.getBaseUrl())) {
                return false;
            }
            if (coreName == null ? other.nodeProps.getCoreName() != null : !coreName.equals(other.nodeProps.getCoreName())) {
                return false;
            }
            return !(url == null ? other.nodeProps.getCoreUrl() != null : !url.equals(other.nodeProps.getCoreUrl()));
        }

        @Override
        public ZkCoreNodeProps getNodeProps() {
            return this.nodeProps;
        }
    }

    public static abstract class Node {
        public abstract String getUrl();

        public abstract boolean checkRetry();

        public abstract String getCoreName();

        public abstract String getBaseUrl();

        public abstract ZkCoreNodeProps getNodeProps();

        public abstract String getCollection();

        public abstract String getShardId();
    }

    public static class Error {
        public Exception e;
        public int statusCode = -1;
        public Req req;

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("SolrCmdDistributor$Error: statusCode=").append(this.statusCode);
            sb.append("; exception=").append(String.valueOf(this.e));
            sb.append("; req=").append(String.valueOf(this.req));
            return sb.toString();
        }
    }

    public static class Response {
        public List<Error> errors = new ArrayList<Error>();
    }

    public static class Req {
        public Node node;
        public UpdateRequest uReq;
        public int retries;
        public boolean synchronous;
        public String cmdString;
        public DistributedUpdateProcessor.RequestReplicationTracker rfTracker;

        public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous) {
            this(cmdString, node, uReq, synchronous, null);
        }

        public Req(String cmdString, Node node, UpdateRequest uReq, boolean synchronous, DistributedUpdateProcessor.RequestReplicationTracker rfTracker) {
            this.node = node;
            this.uReq = uReq;
            this.synchronous = synchronous;
            this.cmdString = cmdString;
            this.rfTracker = rfTracker;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("SolrCmdDistributor$Req: cmd=").append(String.valueOf(this.cmdString));
            sb.append("; node=").append(String.valueOf(this.node));
            return sb.toString();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void trackRequestResult(HttpResponse resp, boolean success) {
            if (this.rfTracker != null) {
                Integer rf = null;
                if (resp != null) {
                    InputStream inputStream = null;
                    try {
                        NamedList hdrList;
                        Object rfObj;
                        inputStream = resp.getEntity().getContent();
                        BinaryResponseParser brp = new BinaryResponseParser();
                        NamedList<Object> nl = brp.processResponse(inputStream, null);
                        Object hdr = nl.get("responseHeader");
                        if (hdr != null && hdr instanceof NamedList && (rfObj = (hdrList = (NamedList)hdr).get("rf")) != null && rfObj instanceof Integer) {
                            rf = (Integer)rfObj;
                        }
                    }
                    catch (Exception e) {
                        log.warn("Failed to parse response from " + this.node + " during replication factor accounting due to: " + e);
                    }
                    finally {
                        if (inputStream != null) {
                            try {
                                inputStream.close();
                            }
                            catch (Exception ignore) {}
                        }
                    }
                }
                this.rfTracker.trackRequestResult(this.node, success, rf);
            }
        }
    }

    public static interface AbortCheck {
        public boolean abortCheck();
    }
}

