package org.janusgraph.graphdb.olap.computer;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
import org.janusgraph.core.JanusGraphComputer;
import org.janusgraph.core.JanusGraphException;
import org.janusgraph.core.JanusGraphTransaction;
import org.janusgraph.core.JanusGraphVertex;
import org.janusgraph.core.Transaction;
import org.janusgraph.core.schema.JanusGraphManagement;
import org.janusgraph.diskstorage.configuration.Configuration;
import org.janusgraph.diskstorage.keycolumnvalue.scan.ScanMetrics;
import org.janusgraph.diskstorage.keycolumnvalue.scan.StandardScanner;
import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
import org.janusgraph.graphdb.database.StandardJanusGraph;
import org.janusgraph.graphdb.olap.computer.VertexMapJob;
import org.janusgraph.graphdb.olap.computer.VertexProgramScanJob;
import org.janusgraph.graphdb.util.WorkerPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/janusgraph-core-0.3.0.jar:org/janusgraph/graphdb/olap/computer/FulgoraGraphComputer.class */
public class FulgoraGraphComputer implements JanusGraphComputer {
    private VertexProgram<?> vertexProgram;
    private final StandardJanusGraph graph;
    private FulgoraMemory memory;
    private FulgoraVertexMemory vertexMemory;
    private final int readBatchSize;
    private final int writeBatchSize;
    private String jobId;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) FulgoraGraphComputer.class);
    private static final AtomicInteger computerCounter = new AtomicInteger(0);
    private final Set<MapReduce> mapReduces = new HashSet();
    private final int expectedNumVertices = 10000;
    private boolean executed = false;
    private int numThreads = 1;
    private GraphComputer.ResultGraph resultGraphMode = null;
    private GraphComputer.Persist persistMode = null;
    private final GraphFilter graphFilter = new GraphFilter();
    private final String name = "compute" + computerCounter.incrementAndGet();

    /* loaded from: input_file:WEB-INF/lib/janusgraph-core-0.3.0.jar:org/janusgraph/graphdb/olap/computer/FulgoraGraphComputer$VertexPropertyWriter.class */
    private class VertexPropertyWriter implements Runnable {
        private final List<Map.Entry<Long, Map<String, Object>>> properties;
        private final AtomicInteger failures;
        static final /* synthetic */ boolean $assertionsDisabled;

        private VertexPropertyWriter(List<Map.Entry<Long, Map<String, Object>>> list, AtomicInteger atomicInteger) {
            if (!$assertionsDisabled && (list == null || list.isEmpty() || atomicInteger == null)) {
                throw new AssertionError();
            }
            this.properties = list;
            this.failures = atomicInteger;
        }

        @Override // java.lang.Runnable
        public void run() {
            JanusGraphTransaction start = FulgoraGraphComputer.this.graph.buildTransaction().enableBatchLoading().start();
            try {
                try {
                    for (Map.Entry<Long, Map<String, Object>> entry : this.properties) {
                        JanusGraphVertex vertex = start.getVertex(entry.getKey().longValue());
                        if (vertex != null) {
                            for (Map.Entry<String, Object> entry2 : entry.getValue().entrySet()) {
                                vertex.property(VertexProperty.Cardinality.single, entry2.getKey(), (String) entry2.getValue(), new Object[0]);
                            }
                        }
                    }
                    start.commit();
                    if (start == null || !start.isOpen()) {
                        return;
                    }
                    start.rollback();
                } catch (Throwable th) {
                    this.failures.incrementAndGet();
                    FulgoraGraphComputer.log.error("Encountered exception while trying to write properties: ", th);
                    if (start == null || !start.isOpen()) {
                        return;
                    }
                    start.rollback();
                }
            } catch (Throwable th2) {
                if (start != null && start.isOpen()) {
                    start.rollback();
                }
                throw th2;
            }
        }

        static {
            $assertionsDisabled = !FulgoraGraphComputer.class.desiredAssertionStatus();
        }
    }

    public FulgoraGraphComputer(StandardJanusGraph standardJanusGraph, Configuration configuration) {
        this.graph = standardJanusGraph;
        this.writeBatchSize = ((Integer) configuration.get(GraphDatabaseConfiguration.BUFFER_SIZE, new String[0])).intValue();
        this.readBatchSize = this.writeBatchSize * 10;
    }

    @Override // org.apache.tinkerpop.gremlin.process.computer.GraphComputer
    public GraphComputer vertices(Traversal<Vertex, Vertex> traversal) {
        this.graphFilter.setVertexFilter(traversal);
        return this;
    }

    @Override // org.apache.tinkerpop.gremlin.process.computer.GraphComputer
    public GraphComputer edges(Traversal<Vertex, Edge> traversal) {
        this.graphFilter.setEdgeFilter(traversal);
        return this;
    }

    @Override // org.apache.tinkerpop.gremlin.process.computer.GraphComputer
    public GraphComputer result(GraphComputer.ResultGraph resultGraph) {
        Preconditions.checkArgument(resultGraph != null, "Need to specify mode");
        this.resultGraphMode = resultGraph;
        return this;
    }

    @Override // org.apache.tinkerpop.gremlin.process.computer.GraphComputer
    public GraphComputer persist(GraphComputer.Persist persist) {
        Preconditions.checkArgument(persist != null, "Need to specify mode");
        this.persistMode = persist;
        return this;
    }

    @Override // org.janusgraph.core.JanusGraphComputer, org.apache.tinkerpop.gremlin.process.computer.GraphComputer
    public JanusGraphComputer workers(int i) {
        Preconditions.checkArgument(i > 0, "Invalid number of threads: %s", Integer.valueOf(i));
        this.numThreads = i;
        return this;
    }

    @Override // org.apache.tinkerpop.gremlin.process.computer.GraphComputer
    public GraphComputer program(VertexProgram vertexProgram) {
        Preconditions.checkState(this.vertexProgram == null, "A vertex program has already been set");
        this.vertexProgram = vertexProgram;
        return this;
    }

    @Override // org.apache.tinkerpop.gremlin.process.computer.GraphComputer
    public GraphComputer mapReduce(MapReduce mapReduce) {
        this.mapReduces.add(mapReduce);
        return this;
    }

    @Override // org.apache.tinkerpop.gremlin.process.computer.GraphComputer
    public Future<ComputerResult> submit() {
        if (this.executed) {
            throw GraphComputer.Exceptions.computerHasAlreadyBeenSubmittedAVertexProgram();
        }
        this.executed = true;
        if (null == this.vertexProgram && this.mapReduces.isEmpty()) {
            throw GraphComputer.Exceptions.computerHasNoVertexProgramNorMapReducers();
        }
        if (null != this.vertexProgram) {
            GraphComputerHelper.validateProgramOnComputer(this, this.vertexProgram);
            this.mapReduces.addAll(this.vertexProgram.getMapReducers());
        }
        this.persistMode = GraphComputerHelper.getPersistState(Optional.ofNullable(this.vertexProgram), Optional.ofNullable(this.persistMode));
        this.resultGraphMode = GraphComputerHelper.getResultGraphState(Optional.ofNullable(this.vertexProgram), Optional.ofNullable(this.resultGraphMode));
        if (!features().supportsResultGraphPersistCombination(this.resultGraphMode, this.persistMode)) {
            throw GraphComputer.Exceptions.resultGraphPersistCombinationNotSupported(this.resultGraphMode, this.persistMode);
        }
        if (this.numThreads > features().getMaxWorkers()) {
            throw GraphComputer.Exceptions.computerRequiresMoreWorkersThanSupported(this.numThreads, features().getMaxWorkers());
        }
        this.memory = new FulgoraMemory(this.vertexProgram, this.mapReduces);
        return CompletableFuture.supplyAsync(() -> {
            WorkerPool workerPool;
            Throwable th;
            Throwable th2;
            Map.Entry entry;
            long currentTimeMillis = System.currentTimeMillis();
            if (null != this.vertexProgram) {
                this.vertexMemory = new FulgoraVertexMemory(10000, this.graph.getIDManager(), this.vertexProgram);
                this.vertexProgram.setup(this.memory);
                VertexProgramScanJob.Executor vertexProgramScanJob = VertexProgramScanJob.getVertexProgramScanJob(this.graph, this.memory, this.vertexMemory, this.vertexProgram);
                Throwable th3 = null;
                int i = 1;
                while (true) {
                    try {
                        this.memory.completeSubRound();
                        this.vertexMemory.nextIteration(this.vertexProgram.getMessageScopes(this.memory));
                        this.jobId = this.name + "#" + i;
                        StandardScanner.Builder buildEdgeScanJob = this.graph.getBackend().buildEdgeScanJob();
                        buildEdgeScanJob.setJobId(this.jobId);
                        buildEdgeScanJob.setNumProcessingThreads(this.numThreads);
                        buildEdgeScanJob.setWorkBlockSize(this.readBatchSize);
                        buildEdgeScanJob.setJob(vertexProgramScanJob);
                        PartitionedVertexProgramExecutor partitionedVertexProgramExecutor = new PartitionedVertexProgramExecutor(this.graph, this.memory, this.vertexMemory, this.vertexProgram);
                        try {
                            ScanMetrics scanMetrics = buildEdgeScanJob.execute().get();
                            long j = scanMetrics.get(ScanMetrics.Metric.FAILURE);
                            if (j > 0) {
                                throw new JanusGraphException("Failed to process [" + j + "] vertices in vertex program iteration [" + i + "]. Computer is aborting.");
                            }
                            partitionedVertexProgramExecutor.run(this.numThreads, scanMetrics);
                            long custom = scanMetrics.getCustom(PartitionedVertexProgramExecutor.PARTITION_VERTEX_POSTFAIL);
                            if (custom > 0) {
                                throw new JanusGraphException("Failed to process [" + custom + "] partitioned vertices in vertex program iteration [" + i + "]. Computer is aborting.");
                            }
                            this.vertexMemory.completeIteration();
                            this.memory.completeSubRound();
                            try {
                                if (this.vertexProgram.terminate(this.memory)) {
                                    this.memory.incrIteration();
                                } else {
                                    this.memory.incrIteration();
                                    i++;
                                }
                            } catch (Throwable th4) {
                                this.memory.incrIteration();
                                throw th4;
                            }
                        } catch (Exception e) {
                            throw new JanusGraphException(e);
                        }
                    } finally {
                        if (vertexProgramScanJob != null) {
                            if (0 != 0) {
                                try {
                                    vertexProgramScanJob.close();
                                } catch (Throwable th5) {
                                    th3.addSuppressed(th5);
                                }
                            } else {
                                vertexProgramScanJob.close();
                            }
                        }
                    }
                }
            }
            HashMap hashMap = new HashMap(this.mapReduces.size());
            for (MapReduce mapReduce : this.mapReduces) {
                if (mapReduce.doStage(MapReduce.Stage.MAP)) {
                    hashMap.put(mapReduce, new FulgoraMapEmitter(mapReduce.doStage(MapReduce.Stage.REDUCE)));
                }
            }
            this.jobId = this.name + "#map";
            VertexMapJob.Executor vertexMapJob = VertexMapJob.getVertexMapJob(this.graph, this.vertexMemory, hashMap);
            Throwable th6 = null;
            try {
                StandardScanner.Builder buildEdgeScanJob2 = this.graph.getBackend().buildEdgeScanJob();
                buildEdgeScanJob2.setJobId(this.jobId);
                buildEdgeScanJob2.setNumProcessingThreads(this.numThreads);
                buildEdgeScanJob2.setWorkBlockSize(this.readBatchSize);
                buildEdgeScanJob2.setJob(vertexMapJob);
                try {
                    ScanMetrics scanMetrics2 = buildEdgeScanJob2.execute().get();
                    long j2 = scanMetrics2.get(ScanMetrics.Metric.FAILURE);
                    if (j2 > 0) {
                        throw new JanusGraphException("Failed to process [" + j2 + "] vertices in map phase. Computer is aborting.");
                    }
                    long custom2 = scanMetrics2.getCustom(VertexMapJob.MAP_JOB_FAILURE);
                    if (custom2 > 0) {
                        throw new JanusGraphException("Failed to process [" + custom2 + "] individual map jobs. Computer is aborting.");
                    }
                    for (Map.Entry entry2 : hashMap.entrySet()) {
                        FulgoraMapEmitter fulgoraMapEmitter = (FulgoraMapEmitter) entry2.getValue();
                        MapReduce mapReduce2 = (MapReduce) entry2.getKey();
                        fulgoraMapEmitter.complete(mapReduce2);
                        if (mapReduce2.doStage(MapReduce.Stage.REDUCE)) {
                            FulgoraReduceEmitter fulgoraReduceEmitter = new FulgoraReduceEmitter();
                            try {
                                workerPool = new WorkerPool(this.numThreads);
                                th = null;
                                try {
                                    try {
                                        workerPool.submit(() -> {
                                            mapReduce2.workerStart(MapReduce.Stage.REDUCE);
                                        });
                                        Iterator it2 = fulgoraMapEmitter.reduceMap.entrySet().iterator();
                                        while (it2.hasNext() && null != (entry = (Map.Entry) it2.next())) {
                                            workerPool.submit(() -> {
                                                mapReduce2.reduce(entry.getKey(), ((Iterable) entry.getValue()).iterator(), fulgoraReduceEmitter);
                                            });
                                        }
                                        workerPool.submit(() -> {
                                            mapReduce2.workerEnd(MapReduce.Stage.REDUCE);
                                        });
                                        if (workerPool != null) {
                                            if (0 != 0) {
                                                try {
                                                    workerPool.close();
                                                } catch (Throwable th7) {
                                                    th.addSuppressed(th7);
                                                }
                                            } else {
                                                workerPool.close();
                                            }
                                        }
                                        fulgoraReduceEmitter.complete(mapReduce2);
                                        mapReduce2.addResultToMemory(this.memory, fulgoraReduceEmitter.reduceQueue.iterator());
                                    } finally {
                                    }
                                } finally {
                                    if (workerPool != null) {
                                        if (th2 != null) {
                                            try {
                                                workerPool.close();
                                            } catch (Throwable th8) {
                                                th.addSuppressed(th8);
                                            }
                                        }
                                    }
                                }
                            } catch (Exception e2) {
                                throw new JanusGraphException("Exception while executing reduce phase", e2);
                            }
                        } else {
                            mapReduce2.addResultToMemory(this.memory, fulgoraMapEmitter.mapQueue.iterator());
                        }
                    }
                    this.memory.attachReferenceElements(this.graph);
                    Transaction transaction = this.graph;
                    if (this.persistMode == GraphComputer.Persist.NOTHING && this.resultGraphMode == GraphComputer.ResultGraph.NEW) {
                        transaction = EmptyGraph.instance();
                    } else if (this.persistMode != GraphComputer.Persist.NOTHING && this.vertexProgram != null && !this.vertexProgram.getVertexComputeKeys().isEmpty()) {
                        JanusGraphManagement openManagement = this.graph.openManagement();
                        try {
                            for (VertexComputeKey vertexComputeKey : this.vertexProgram.getVertexComputeKeys()) {
                                if (!openManagement.containsPropertyKey(vertexComputeKey.getKey())) {
                                    log.warn("Property key [{}] is not part of the schema and will be created. It is advised to initialize all keys.", vertexComputeKey.getKey());
                                }
                                openManagement.getOrCreatePropertyKey(vertexComputeKey.getKey());
                            }
                            openManagement.commit();
                            if (openManagement != null && openManagement.isOpen()) {
                                openManagement.rollback();
                            }
                            Map transformValues = Maps.transformValues(this.vertexMemory.getMutableVertexProperties(), new Function<Map<String, Object>, Map<String, Object>>() { // from class: org.janusgraph.graphdb.olap.computer.FulgoraGraphComputer.1
                                @Override // com.google.common.base.Function
                                @Nullable
                                public Map<String, Object> apply(Map<String, Object> map) {
                                    return Maps.filterKeys(map, str -> {
                                        return !VertexProgramHelper.isTransientVertexComputeKey(str, FulgoraGraphComputer.this.vertexProgram.getVertexComputeKeys());
                                    });
                                }
                            });
                            if (this.resultGraphMode == GraphComputer.ResultGraph.ORIGINAL) {
                                AtomicInteger atomicInteger = new AtomicInteger(0);
                                try {
                                    workerPool = new WorkerPool(this.numThreads);
                                    Throwable th9 = null;
                                    try {
                                        try {
                                            ArrayList arrayList = new ArrayList(this.writeBatchSize / this.vertexProgram.getVertexComputeKeys().size());
                                            int i2 = 0;
                                            for (Map.Entry entry3 : transformValues.entrySet()) {
                                                arrayList.add(entry3);
                                                i2 += ((Map) entry3.getValue()).size();
                                                if (i2 >= this.writeBatchSize) {
                                                    workerPool.submit(new VertexPropertyWriter(arrayList, atomicInteger));
                                                    arrayList = new ArrayList(arrayList.size());
                                                    i2 = 0;
                                                }
                                            }
                                            if (!arrayList.isEmpty()) {
                                                workerPool.submit(new VertexPropertyWriter(arrayList, atomicInteger));
                                            }
                                            if (workerPool != null) {
                                                if (0 != 0) {
                                                    try {
                                                        workerPool.close();
                                                    } catch (Throwable th10) {
                                                        th9.addSuppressed(th10);
                                                    }
                                                } else {
                                                    workerPool.close();
                                                }
                                            }
                                            if (atomicInteger.get() > 0) {
                                                throw new JanusGraphException("Could not persist program results to graph. Check log for details.");
                                            }
                                        } finally {
                                        }
                                    } finally {
                                        if (workerPool != null) {
                                            if (th2 != null) {
                                                try {
                                                    workerPool.close();
                                                } catch (Throwable th11) {
                                                    th.addSuppressed(th11);
                                                }
                                            }
                                        }
                                    }
                                } catch (Exception e3) {
                                    throw new JanusGraphException("Exception while attempting to persist result into graph", e3);
                                }
                            } else if (this.resultGraphMode == GraphComputer.ResultGraph.NEW) {
                                transaction = this.graph.newTransaction();
                                for (Map.Entry entry4 : transformValues.entrySet()) {
                                    Vertex next = transaction.vertices(entry4.getKey()).next();
                                    for (Map.Entry entry5 : ((Map) entry4.getValue()).entrySet()) {
                                        if (entry5.getValue() instanceof List) {
                                            ((List) entry5.getValue()).forEach(obj -> {
                                                next.property(VertexProperty.Cardinality.list, (String) entry5.getKey(), obj, new Object[0]);
                                            });
                                        } else {
                                            next.property(VertexProperty.Cardinality.single, (String) entry5.getKey(), entry5.getValue(), new Object[0]);
                                        }
                                    }
                                }
                            }
                        } catch (Throwable th12) {
                            if (openManagement != null && openManagement.isOpen()) {
                                openManagement.rollback();
                            }
                            throw th12;
                        }
                    }
                    this.memory.setRuntime(System.currentTimeMillis() - currentTimeMillis);
                    this.memory.complete();
                    return new DefaultComputerResult(transaction, this.memory);
                } catch (Exception e4) {
                    throw new JanusGraphException(e4);
                }
            } finally {
                if (vertexMapJob != null) {
                    if (0 != 0) {
                        try {
                            vertexMapJob.close();
                        } catch (Throwable th13) {
                            th6.addSuppressed(th13);
                        }
                    } else {
                        vertexMapJob.close();
                    }
                }
            }
        });
    }

    public String toString() {
        return StringFactory.graphComputerString(this);
    }

    @Override // org.apache.tinkerpop.gremlin.process.computer.GraphComputer
    public GraphComputer.Features features() {
        return new GraphComputer.Features() { // from class: org.janusgraph.graphdb.olap.computer.FulgoraGraphComputer.2
            @Override // org.apache.tinkerpop.gremlin.process.computer.GraphComputer.Features
            public boolean supportsVertexAddition() {
                return false;
            }

            @Override // org.apache.tinkerpop.gremlin.process.computer.GraphComputer.Features
            public boolean supportsVertexRemoval() {
                return false;
            }

            @Override // org.apache.tinkerpop.gremlin.process.computer.GraphComputer.Features
            public boolean supportsVertexPropertyAddition() {
                return true;
            }

            @Override // org.apache.tinkerpop.gremlin.process.computer.GraphComputer.Features
            public boolean supportsVertexPropertyRemoval() {
                return false;
            }

            @Override // org.apache.tinkerpop.gremlin.process.computer.GraphComputer.Features
            public boolean supportsEdgeAddition() {
                return false;
            }

            @Override // org.apache.tinkerpop.gremlin.process.computer.GraphComputer.Features
            public boolean supportsEdgeRemoval() {
                return false;
            }

            @Override // org.apache.tinkerpop.gremlin.process.computer.GraphComputer.Features
            public boolean supportsEdgePropertyAddition() {
                return false;
            }

            @Override // org.apache.tinkerpop.gremlin.process.computer.GraphComputer.Features
            public boolean supportsEdgePropertyRemoval() {
                return false;
            }

            @Override // org.apache.tinkerpop.gremlin.process.computer.GraphComputer.Features
            public boolean supportsGraphFilter() {
                return false;
            }
        };
    }
}
