/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.test.randomwalk.shard;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.data.ColumnUpdate;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.Base64;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.test.randomwalk.Environment;
import org.apache.accumulo.test.randomwalk.State;
import org.apache.accumulo.test.randomwalk.Test;
import org.apache.accumulo.test.randomwalk.shard.Insert;
import org.apache.accumulo.test.randomwalk.shard.SortTool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class BulkInsert
extends Test {
    @Override
    public void visit(State state, Environment env, Properties props) throws Exception {
        String indexTableName = (String)state.get("indexTableName");
        String dataTableName = (String)state.get("docTableName");
        int numPartitions = (Integer)state.get("numPartitions");
        Random rand = (Random)state.get("rand");
        long nextDocID = (Long)state.get("nextDocID");
        int minInsert = Integer.parseInt(props.getProperty("minInsert"));
        int maxInsert = Integer.parseInt(props.getProperty("maxInsert"));
        int numToInsert = rand.nextInt(maxInsert - minInsert) + minInsert;
        int maxSplits = Integer.parseInt(props.getProperty("maxSplits"));
        Configuration conf = CachedConfiguration.getInstance();
        FileSystem fs = FileSystem.get((Configuration)conf);
        String rootDir = "/tmp/shard_bulk/" + dataTableName;
        fs.mkdirs(new Path(rootDir));
        SeqfileBatchWriter dataWriter = new SeqfileBatchWriter(conf, fs, rootDir + "/data.seq");
        SeqfileBatchWriter indexWriter = new SeqfileBatchWriter(conf, fs, rootDir + "/index.seq");
        for (int i = 0; i < numToInsert; ++i) {
            String docID = Insert.insertRandomDocument(nextDocID++, dataWriter, indexWriter, indexTableName, dataTableName, numPartitions, rand);
            this.log.debug((Object)("Bulk inserting document " + docID));
        }
        state.set("nextDocID", nextDocID);
        dataWriter.close();
        indexWriter.close();
        this.sort(state, env, fs, dataTableName, rootDir + "/data.seq", rootDir + "/data_bulk", rootDir + "/data_work", maxSplits);
        this.sort(state, env, fs, indexTableName, rootDir + "/index.seq", rootDir + "/index_bulk", rootDir + "/index_work", maxSplits);
        this.bulkImport(fs, state, env, dataTableName, rootDir, "data");
        this.bulkImport(fs, state, env, indexTableName, rootDir, "index");
        fs.delete(new Path(rootDir), true);
    }

    private void bulkImport(FileSystem fs, State state, Environment env, String tableName, String rootDir, String prefix) throws Exception {
        while (true) {
            String bulkDir = rootDir + "/" + prefix + "_bulk";
            String failDir = rootDir + "/" + prefix + "_failure";
            Path failPath = new Path(failDir);
            fs.delete(failPath, true);
            fs.mkdirs(failPath);
            env.getConnector().tableOperations().importDirectory(tableName, bulkDir, failDir, true);
            FileStatus[] failures = fs.listStatus(failPath);
            if (failures == null || failures.length <= 0) break;
            this.log.warn((Object)"Failed to bulk import some files, retrying ");
            for (FileStatus failure : failures) {
                if (!failure.getPath().getName().endsWith(".seq")) {
                    fs.rename(failure.getPath(), new Path(new Path(bulkDir), failure.getPath().getName()));
                    continue;
                }
                this.log.debug((Object)("Ignoring " + failure.getPath()));
            }
            UtilWaitThread.sleep((long)3000L);
        }
    }

    private void sort(State state, Environment env, FileSystem fs, String tableName, String seqFile, String outputDir, String workDir, int maxSplits) throws Exception {
        PrintStream out = new PrintStream((OutputStream)new BufferedOutputStream((OutputStream)fs.create(new Path(workDir + "/splits.txt"))), false, StandardCharsets.UTF_8.name());
        Connector conn = env.getConnector();
        Collection splits = conn.tableOperations().listSplits(tableName, maxSplits);
        for (Text split : splits) {
            out.println(Base64.encodeBase64String((byte[])TextUtil.getBytes((Text)split)));
        }
        out.close();
        SortTool sortTool = new SortTool(seqFile, outputDir, workDir + "/splits.txt", splits);
        String[] args = new String[]{"-libjars", this.getMapReduceJars()};
        if (ToolRunner.run((Configuration)CachedConfiguration.getInstance(), (Tool)sortTool, (String[])args) != 0) {
            throw new Exception("Failed to run map/red verify");
        }
    }

    class SeqfileBatchWriter
    implements BatchWriter {
        SequenceFile.Writer writer;

        SeqfileBatchWriter(Configuration conf, FileSystem fs, String file) throws IOException {
            this.writer = SequenceFile.createWriter((Configuration)conf, (SequenceFile.Writer.Option[])new SequenceFile.Writer.Option[]{SequenceFile.Writer.file((Path)fs.makeQualified(new Path(file))), SequenceFile.Writer.keyClass(Key.class), SequenceFile.Writer.valueClass(Value.class)});
        }

        public void addMutation(Mutation m) throws MutationsRejectedException {
            List updates = m.getUpdates();
            for (ColumnUpdate cu : updates) {
                Key key = new Key(m.getRow(), cu.getColumnFamily(), cu.getColumnQualifier(), cu.getColumnVisibility(), Long.MAX_VALUE, false, false);
                Value val = new Value(cu.getValue(), false);
                try {
                    this.writer.append((Writable)key, (Writable)val);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        public void addMutations(Iterable<Mutation> iterable) throws MutationsRejectedException {
            for (Mutation mutation : iterable) {
                this.addMutation(mutation);
            }
        }

        public void flush() throws MutationsRejectedException {
        }

        public void close() throws MutationsRejectedException {
            try {
                this.writer.close();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

