/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.master.tableOps;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.master.Master;
import org.apache.accumulo.master.tableOps.LoadFiles;
import org.apache.accumulo.master.tableOps.MasterRepo;
import org.apache.accumulo.master.tableOps.Utils;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.tablets.UniqueNameAllocator;
import org.apache.accumulo.server.util.MetadataTableUtil;
import org.apache.accumulo.server.zookeeper.TransactionWatcher;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BulkImport
extends MasterRepo {
    public static final String FAILURES_TXT = "failures.txt";
    private static final long serialVersionUID = 1L;
    private static final Logger log = LoggerFactory.getLogger(BulkImport.class);
    private String tableId;
    private String sourceDir;
    private String errorDir;
    private boolean setTime;

    public BulkImport(String tableId, String sourceDir, String errorDir, boolean setTime) {
        this.tableId = tableId;
        this.sourceDir = sourceDir;
        this.errorDir = errorDir;
        this.setTime = setTime;
    }

    @Override
    public long isReady(long tid, Master master) throws Exception {
        if (!Utils.getReadLock(this.tableId, tid).tryLock()) {
            return 100L;
        }
        Tables.clearCache((Instance)master.getInstance());
        if (Tables.getTableState((Instance)master.getInstance(), (String)this.tableId) == TableState.ONLINE) {
            long reserve2 = Utils.reserveHdfsDirectory(this.sourceDir, tid);
            long reserve1 = reserve2;
            if (reserve1 == 0L) {
                reserve2 = Utils.reserveHdfsDirectory(this.errorDir, tid);
            }
            return reserve2;
        }
        throw new ThriftTableOperationException(this.tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.OFFLINE, null);
    }

    @Override
    public Repo<Master> call(long tid, Master master) throws Exception {
        log.debug(" tid " + tid + " sourceDir " + this.sourceDir);
        Utils.getReadLock(this.tableId, tid).lock();
        VolumeManager fs = master.getFileSystem();
        Path errorPath = new Path(this.errorDir);
        FileStatus errorStatus = null;
        try {
            errorStatus = fs.getFileStatus(errorPath);
        }
        catch (FileNotFoundException fileNotFoundException) {
            // empty catch block
        }
        if (errorStatus == null) {
            throw new ThriftTableOperationException(this.tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, this.errorDir + " does not exist");
        }
        if (!errorStatus.isDirectory()) {
            throw new ThriftTableOperationException(this.tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, this.errorDir + " is not a directory");
        }
        if (fs.listStatus(errorPath).length != 0) {
            throw new ThriftTableOperationException(this.tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, this.errorDir + " is not empty");
        }
        TransactionWatcher.ZooArbitrator.start((String)"bulkTx", (long)tid);
        try {
            String bulkDir = this.prepareBulkImport(master, fs, this.sourceDir, this.tableId);
            log.debug(" tid " + tid + " bulkDir " + bulkDir);
            return new LoadFiles(this.tableId, this.sourceDir, bulkDir, this.errorDir, this.setTime);
        }
        catch (IOException ex) {
            log.error("error preparing the bulk import directory", (Throwable)ex);
            throw new ThriftTableOperationException(this.tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_INPUT_DIRECTORY, this.sourceDir + ": " + ex);
        }
    }

    private Path createNewBulkDir(VolumeManager fs, String tableId) throws IOException {
        Path tempPath = fs.matchingFileSystem(new Path(this.sourceDir), ServerConstants.getTablesDirs());
        if (tempPath == null) {
            throw new IOException(this.sourceDir + " is not in a volume configured for Accumulo");
        }
        String tableDir = tempPath.toString();
        if (tableDir == null) {
            throw new IOException(this.sourceDir + " is not in a volume configured for Accumulo");
        }
        Path directory = new Path(tableDir + "/" + tableId);
        fs.mkdirs(directory);
        UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
        while (true) {
            Path newBulkDir;
            if (fs.exists(newBulkDir = new Path(directory, "b-" + namer.getNextName()))) {
                throw new IOException("Dir exist when it should not " + newBulkDir);
            }
            if (fs.mkdirs(newBulkDir)) {
                return newBulkDir;
            }
            log.warn("Failed to create " + newBulkDir + " for unknown reason");
            UtilWaitThread.sleep((long)3000L);
        }
    }

    private String prepareBulkImport(Master master, final VolumeManager fs, String dir, String tableId) throws Exception {
        final Path bulkDir = this.createNewBulkDir(fs, tableId);
        MetadataTableUtil.addBulkLoadInProgressFlag((AccumuloServerContext)master, (String)("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName()));
        Path dirPath = new Path(dir);
        FileStatus[] mapFiles = fs.listStatus(dirPath);
        final UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
        int workerCount = master.getConfiguration().getCount(Property.MASTER_BULK_RENAME_THREADS);
        SimpleThreadPool workers = new SimpleThreadPool(workerCount, "bulk move");
        ArrayList<Future> results = new ArrayList<Future>();
        FileStatus[] fileStatusArray = mapFiles;
        int n = fileStatusArray.length;
        for (int i = 0; i < n; ++i) {
            FileStatus file;
            final FileStatus fileStatus = file = fileStatusArray[i];
            results.add(workers.submit((Callable)new Callable<Exception>(){

                @Override
                public Exception call() throws Exception {
                    try {
                        String[] sa = fileStatus.getPath().getName().split("\\.");
                        String extension = "";
                        if (sa.length > 1) {
                            extension = sa[sa.length - 1];
                            if (!FileOperations.getValidExtensions().contains(extension)) {
                                log.warn(fileStatus.getPath() + " does not have a valid extension, ignoring");
                                return null;
                            }
                        } else {
                            extension = "map";
                        }
                        if (extension.equals("map")) {
                            if (!fileStatus.isDirectory()) {
                                log.warn(fileStatus.getPath() + " is not a map file, ignoring");
                                return null;
                            }
                            if (fileStatus.getPath().getName().equals("_logs")) {
                                log.info(fileStatus.getPath() + " is probably a log directory from a map/reduce task, skipping");
                                return null;
                            }
                            try {
                                FileStatus dataStatus = fs.getFileStatus(new Path(fileStatus.getPath(), "data"));
                                if (dataStatus.isDirectory()) {
                                    log.warn(fileStatus.getPath() + " is not a map file, ignoring");
                                    return null;
                                }
                            }
                            catch (FileNotFoundException fnfe) {
                                log.warn(fileStatus.getPath() + " is not a map file, ignoring");
                                return null;
                            }
                        }
                        String newName = "I" + namer.getNextName() + "." + extension;
                        Path newPath = new Path(bulkDir, newName);
                        try {
                            fs.rename(fileStatus.getPath(), newPath);
                            log.debug("Moved " + fileStatus.getPath() + " to " + newPath);
                        }
                        catch (IOException E1) {
                            log.error("Could not move: {} {}", (Object)fileStatus.getPath().toString(), (Object)E1.getMessage());
                        }
                    }
                    catch (Exception ex) {
                        return ex;
                    }
                    return null;
                }
            }));
        }
        workers.shutdown();
        while (!workers.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
        }
        for (Future ex : results) {
            if (ex.get() == null) continue;
            throw (Exception)ex.get();
        }
        return bulkDir.toString();
    }

    @Override
    public void undo(long tid, Master environment) throws Exception {
        Utils.unreserveHdfsDirectory(this.sourceDir, tid);
        Utils.unreserveHdfsDirectory(this.errorDir, tid);
        Utils.getReadLock(this.tableId, tid).unlock();
        TransactionWatcher.ZooArbitrator.cleanup((String)"bulkTx", (long)tid);
    }
}

