/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
public class TestParallelReadUtil {
    static final Log LOG = LogFactory.getLog(TestParallelReadUtil.class);
    static BlockReaderTestUtil util = null;
    static DFSClient dfsClient = null;
    static final int FILE_SIZE_K = 256;
    static Random rand = null;
    static final int DEFAULT_REPLICATION_FACTOR = 2;
    protected boolean verifyChecksums = true;

    public static void setupCluster(int replicationFactor, HdfsConfiguration conf) throws Exception {
        util = new BlockReaderTestUtil(replicationFactor, conf);
        dfsClient = util.getDFSClient();
        long seed = Time.now();
        LOG.info((Object)("Random seed: " + seed));
        rand = new Random(seed);
    }

    boolean runParallelRead(int nFiles, int nWorkerEach, ReadWorkerHelper helper) throws IOException {
        ReadWorker[] workers = new ReadWorker[nFiles * nWorkerEach];
        TestFileInfo[] testInfoArr = new TestFileInfo[nFiles];
        int nWorkers = 0;
        for (int i = 0; i < nFiles; ++i) {
            TestFileInfo testInfo;
            testInfoArr[i] = testInfo = new TestFileInfo();
            testInfo.filepath = new Path("/TestParallelRead.dat." + i);
            testInfo.authenticData = util.writeFile(testInfo.filepath, 256);
            testInfo.dis = dfsClient.open(testInfo.filepath.toString(), TestParallelReadUtil.dfsClient.getConf().ioBufferSize, this.verifyChecksums);
            for (int j = 0; j < nWorkerEach; ++j) {
                workers[nWorkers++] = new ReadWorker(testInfo, nWorkers, helper);
            }
        }
        long starttime = Time.monotonicNow();
        for (ReadWorker worker : workers) {
            worker.start();
        }
        for (ReadWorker worker : workers) {
            try {
                worker.join();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        long endtime = Time.monotonicNow();
        for (TestFileInfo testInfo : testInfoArr) {
            testInfo.dis.close();
        }
        boolean res = true;
        long totalRead = 0L;
        for (ReadWorker worker : workers) {
            long nread = worker.getBytesRead();
            LOG.info((Object)("--- Report: " + worker.getName() + " read " + nread + " B; " + "average " + nread / 1024L + " B per read"));
            totalRead += nread;
            if (!worker.hasError()) continue;
            res = false;
        }
        double timeTakenSec = (double)(endtime - starttime) / 1000.0;
        long totalReadKB = totalRead / 1024L;
        LOG.info((Object)("=== Report: " + nWorkers + " threads read " + totalReadKB + " KB (across " + nFiles + " file(s)) in " + timeTakenSec + "s; average " + (double)totalReadKB / timeTakenSec + " KB/s"));
        return res;
    }

    public void runTestWorkload(ReadWorkerHelper helper) throws IOException {
        if (!this.runParallelRead(1, 4, helper)) {
            Assert.fail((String)"Check log for errors");
        }
        if (!this.runParallelRead(1, 16, helper)) {
            Assert.fail((String)"Check log for errors");
        }
        if (!this.runParallelRead(2, 4, helper)) {
            Assert.fail((String)"Check log for errors");
        }
    }

    public static void teardownCluster() throws Exception {
        util.shutdown();
    }

    @Test
    public void testParallelReadCopying() throws IOException {
        this.runTestWorkload(new CopyingReadWorkerHelper());
    }

    @Test
    public void testParallelReadByteBuffer() throws IOException {
        this.runTestWorkload(new DirectReadWorkerHelper());
    }

    @Test
    public void testParallelReadMixed() throws IOException {
        this.runTestWorkload(new MixedWorkloadHelper());
    }

    @Test
    public void testParallelNoChecksums() throws IOException {
        this.verifyChecksums = false;
        this.runTestWorkload(new MixedWorkloadHelper());
    }

    static {
        LogManager.getLogger((String)(DataNode.class.getName() + ".clienttrace")).setLevel(Level.WARN);
    }

    static class ReadWorker
    extends Thread {
        public static final int N_ITERATIONS = 1024;
        private static final double PROPORTION_NON_POSITIONAL_READ = 0.1;
        private final TestFileInfo testInfo;
        private final long fileSize;
        private long bytesRead;
        private boolean error;
        private final ReadWorkerHelper helper;
        static int readCount = 0;

        ReadWorker(TestFileInfo testInfo, int id, ReadWorkerHelper helper) {
            super("ReadWorker-" + id + "-" + testInfo.filepath.toString());
            this.testInfo = testInfo;
            this.helper = helper;
            this.fileSize = testInfo.dis.getFileLength();
            Assert.assertEquals((long)this.fileSize, (long)testInfo.authenticData.length);
            this.bytesRead = 0L;
            this.error = false;
        }

        @Override
        public void run() {
            for (int i = 0; i < 1024; ++i) {
                int startOff = rand.nextInt((int)this.fileSize);
                int len = 0;
                try {
                    double p = rand.nextDouble();
                    if (p < 0.1) {
                        len = Math.min(rand.nextInt(64), (int)this.fileSize - startOff);
                        this.read(startOff, len);
                        this.bytesRead += (long)len;
                        continue;
                    }
                    len = rand.nextInt((int)(this.fileSize - (long)startOff));
                    this.pRead(startOff, len);
                    this.bytesRead += (long)len;
                    continue;
                }
                catch (Throwable t) {
                    LOG.error((Object)(this.getName() + ": Error while testing read at " + startOff + " length " + len), t);
                    this.error = true;
                    Assert.fail((String)t.getMessage());
                }
            }
        }

        public long getBytesRead() {
            return this.bytesRead;
        }

        public boolean hasError() {
            return this.error;
        }

        private void read(int start, int len) throws Exception {
            Assert.assertTrue((String)("Bad args: " + start + " + " + len + " should be <= " + this.fileSize), ((long)(start + len) <= this.fileSize ? 1 : 0) != 0);
            ++readCount;
            DFSInputStream dis = this.testInfo.dis;
            byte[] buf = new byte[len];
            this.helper.read(dis, buf, start, len);
            this.verifyData("Read data corrupted", buf, start, start + len);
        }

        private void pRead(int start, int len) throws Exception {
            Assert.assertTrue((String)("Bad args: " + start + " + " + len + " should be <= " + this.fileSize), ((long)(start + len) <= this.fileSize ? 1 : 0) != 0);
            DFSInputStream dis = this.testInfo.dis;
            byte[] buf = new byte[len];
            this.helper.pRead(dis, buf, start, len);
            this.verifyData("Pread data corrupted", buf, start, start + len);
        }

        private void verifyData(String msg, byte[] actual, int start, int end) throws Exception {
            byte[] auth = this.testInfo.authenticData;
            if (end > auth.length) {
                throw new Exception(msg + ": Actual array (" + end + ") is past the end of authentic data (" + auth.length + ")");
            }
            int j = start;
            int i = 0;
            while (i < actual.length) {
                if (auth[j] != actual[i]) {
                    throw new Exception(msg + ": Arrays byte " + i + " (at offset " + j + ") differs: expect " + auth[j] + " got " + actual[i]);
                }
                ++i;
                ++j;
            }
        }
    }

    static class MixedWorkloadHelper
    implements ReadWorkerHelper {
        private final DirectReadWorkerHelper bb = new DirectReadWorkerHelper();
        private final CopyingReadWorkerHelper copy = new CopyingReadWorkerHelper();
        private final double COPYING_PROBABILITY = 0.5;

        MixedWorkloadHelper() {
        }

        @Override
        public int read(DFSInputStream dis, byte[] target, int startOff, int len) throws IOException {
            double p = rand.nextDouble();
            if (p > 0.5) {
                return this.bb.read(dis, target, startOff, len);
            }
            return this.copy.read(dis, target, startOff, len);
        }

        @Override
        public int pRead(DFSInputStream dis, byte[] target, int startOff, int len) throws IOException {
            double p = rand.nextDouble();
            if (p > 0.5) {
                return this.bb.pRead(dis, target, startOff, len);
            }
            return this.copy.pRead(dis, target, startOff, len);
        }
    }

    static class CopyingReadWorkerHelper
    implements ReadWorkerHelper {
        CopyingReadWorkerHelper() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public int read(DFSInputStream dis, byte[] target, int startOff, int len) throws IOException {
            int cnt;
            DFSInputStream dFSInputStream = dis;
            synchronized (dFSInputStream) {
                int read;
                dis.seek((long)startOff);
                for (cnt = 0; cnt < len; cnt += read) {
                    read = dis.read(target, cnt, len - cnt);
                    if (read != -1) continue;
                    return read;
                }
            }
            return cnt;
        }

        @Override
        public int pRead(DFSInputStream dis, byte[] target, int startOff, int len) throws IOException {
            int cnt;
            int read;
            for (cnt = 0; cnt < len; cnt += read) {
                read = dis.read((long)startOff, target, cnt, len - cnt);
                if (read != -1) continue;
                return read;
            }
            return cnt;
        }
    }

    static class DirectReadWorkerHelper
    implements ReadWorkerHelper {
        DirectReadWorkerHelper() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public int read(DFSInputStream dis, byte[] target, int startOff, int len) throws IOException {
            int cnt;
            ByteBuffer bb = ByteBuffer.allocateDirect(target.length);
            DFSInputStream dFSInputStream = dis;
            synchronized (dFSInputStream) {
                int read;
                dis.seek((long)startOff);
                for (cnt = 0; cnt < len; cnt += read) {
                    read = dis.read(bb);
                    if (read != -1) continue;
                    return read;
                }
            }
            bb.clear();
            bb.get(target);
            return cnt;
        }

        @Override
        public int pRead(DFSInputStream dis, byte[] target, int startOff, int len) throws IOException {
            return this.read(dis, target, startOff, len);
        }
    }

    static interface ReadWorkerHelper {
        public int read(DFSInputStream var1, byte[] var2, int var3, int var4) throws IOException;

        public int pRead(DFSInputStream var1, byte[] var2, int var3, int var4) throws IOException;
    }

    private class TestFileInfo {
        public DFSInputStream dis;
        public Path filepath;
        public byte[] authenticData;

        private TestFileInfo() {
        }
    }
}

