/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.balancer;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.balancer.Balancer;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;

public class TestBalancer {
    private static final Log LOG = LogFactory.getLog((String)"org.apache.hadoop.hdfs.TestBalancer");
    static final long CAPACITY = 500L;
    static final String RACK0 = "/rack0";
    static final String RACK1 = "/rack1";
    static final String RACK2 = "/rack2";
    private static final String fileName = "/tmp.txt";
    static final Path filePath;
    private MiniDFSCluster cluster;
    ClientProtocol client;
    static final long TIMEOUT = 40000L;
    static final double CAPACITY_ALLOWED_VARIANCE = 0.005;
    static final double BALANCE_ALLOWED_VARIANCE = 0.11;
    static final int DEFAULT_BLOCK_SIZE = 10;
    private static final Random r;

    static void initConf(Configuration conf) {
        conf.setLong("dfs.blocksize", 10L);
        conf.setInt("dfs.bytes-per-checksum", 10);
        conf.setLong("dfs.heartbeat.interval", 1L);
        SimulatedFSDataset.setFactory(conf);
        conf.setLong("dfs.balancer.movedWinWidth", 2000L);
    }

    static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen, short replicationFactor, int nnIndex) throws IOException, InterruptedException, TimeoutException {
        DistributedFileSystem fs = cluster.getFileSystem(nnIndex);
        DFSTestUtil.createFile((FileSystem)fs, filePath, fileLen, replicationFactor, r.nextLong());
        DFSTestUtil.waitReplication((FileSystem)fs, filePath, replicationFactor);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ExtendedBlock[] generateBlocks(Configuration conf, long size, short numNodes) throws IOException, InterruptedException, TimeoutException {
        this.cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numNodes).build();
        try {
            this.cluster.waitActive();
            this.client = (ClientProtocol)NameNodeProxies.createProxy((Configuration)conf, (URI)this.cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
            short replicationFactor = (short)(numNodes - 1);
            long fileLen = size / (long)replicationFactor;
            TestBalancer.createFile(this.cluster, filePath, fileLen, replicationFactor, 0);
            List locatedBlocks = this.client.getBlockLocations(fileName, 0L, fileLen).getLocatedBlocks();
            int numOfBlocks = locatedBlocks.size();
            ExtendedBlock[] blocks = new ExtendedBlock[numOfBlocks];
            for (int i = 0; i < numOfBlocks; ++i) {
                ExtendedBlock b = ((LocatedBlock)locatedBlocks.get(i)).getBlock();
                blocks[i] = new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(), b.getNumBytes(), b.getGenerationStamp());
            }
            ExtendedBlock[] extendedBlockArray = blocks;
            return extendedBlockArray;
        }
        finally {
            this.cluster.shutdown();
        }
    }

    static Block[][] distributeBlocks(ExtendedBlock[] blocks, short replicationFactor, long[] distribution) {
        int i;
        long[] usedSpace = new long[distribution.length];
        System.arraycopy(distribution, 0, usedSpace, 0, distribution.length);
        ArrayList blockReports = new ArrayList(usedSpace.length);
        Block[][] results = new Block[usedSpace.length][];
        for (i = 0; i < usedSpace.length; ++i) {
            blockReports.add(new ArrayList());
        }
        for (i = 0; i < blocks.length; ++i) {
            for (int j = 0; j < replicationFactor; ++j) {
                boolean notChosen = true;
                while (notChosen) {
                    int chosenIndex = r.nextInt(usedSpace.length);
                    if (usedSpace[chosenIndex] <= 0L) continue;
                    notChosen = false;
                    ((List)blockReports.get(chosenIndex)).add(blocks[i].getLocalBlock());
                    int n = chosenIndex;
                    usedSpace[n] = usedSpace[n] - blocks[i].getNumBytes();
                }
            }
        }
        for (i = 0; i < usedSpace.length; ++i) {
            List nodeBlockList = (List)blockReports.get(i);
            results[i] = nodeBlockList.toArray(new Block[nodeBlockList.size()]);
        }
        return results;
    }

    static long sum(long[] x) {
        long s = 0L;
        for (long a : x) {
            s += a;
        }
        return s;
    }

    private void testUnevenDistribution(Configuration conf, long[] distribution, long[] capacities, String[] racks) throws Exception {
        int numDatanodes = distribution.length;
        if (capacities.length != numDatanodes || racks.length != numDatanodes) {
            throw new IllegalArgumentException("Array length is not the same");
        }
        long totalUsedSpace = TestBalancer.sum(distribution);
        ExtendedBlock[] blocks = this.generateBlocks(conf, totalUsedSpace, (short)numDatanodes);
        Block[][] blocksDN = TestBalancer.distributeBlocks(blocks, (short)(numDatanodes - 1), distribution);
        conf.set("dfs.namenode.safemode.threshold-pct", "0.0f");
        this.cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes).format(false).racks(racks).simulatedCapacities(capacities).build();
        this.cluster.waitActive();
        this.client = (ClientProtocol)NameNodeProxies.createProxy((Configuration)conf, (URI)this.cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
        for (int i = 0; i < blocksDN.length; ++i) {
            this.cluster.injectBlocks(i, Arrays.asList(blocksDN[i]), null);
        }
        long totalCapacity = TestBalancer.sum(capacities);
        this.runBalancer(conf, totalUsedSpace, totalCapacity);
        this.cluster.shutdown();
    }

    static void waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace, ClientProtocol client, MiniDFSCluster cluster) throws IOException, TimeoutException {
        long timeout = 40000L;
        long failtime = timeout <= 0L ? Long.MAX_VALUE : Time.now() + timeout;
        while (true) {
            long[] status = client.getStats();
            double totalSpaceVariance = Math.abs((double)status[0] - (double)expectedTotalSpace) / (double)expectedTotalSpace;
            double usedSpaceVariance = Math.abs((double)status[1] - (double)expectedUsedSpace) / (double)expectedUsedSpace;
            if (totalSpaceVariance < 0.005 && usedSpaceVariance < 0.005) break;
            if (Time.now() > failtime) {
                throw new TimeoutException("Cluster failed to reached expected values of totalSpace (current: " + status[0] + ", expected: " + expectedTotalSpace + "), or usedSpace (current: " + status[1] + ", expected: " + expectedUsedSpace + "), in more than " + timeout + " msec.");
            }
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException ignored) {}
        }
    }

    static void waitForBalancer(long totalUsedSpace, long totalCapacity, ClientProtocol client, MiniDFSCluster cluster) throws IOException, TimeoutException {
        boolean balanced;
        long timeout = 40000L;
        long failtime = timeout <= 0L ? Long.MAX_VALUE : Time.now() + timeout;
        double avgUtilization = (double)totalUsedSpace / (double)totalCapacity;
        block2: do {
            DatanodeInfo[] datanodeReport = client.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL);
            Assert.assertEquals((long)datanodeReport.length, (long)cluster.getDataNodes().size());
            balanced = true;
            for (DatanodeInfo datanode : datanodeReport) {
                double nodeUtilization = (double)datanode.getDfsUsed() / (double)datanode.getCapacity();
                if (!(Math.abs(avgUtilization - nodeUtilization) > 0.11)) continue;
                balanced = false;
                if (Time.now() > failtime) {
                    throw new TimeoutException("Rebalancing expected avg utilization to become " + avgUtilization + ", but on datanode " + datanode + " it remains at " + nodeUtilization + " after more than " + 40000L + " msec.");
                }
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException ignored) {}
                continue block2;
            }
        } while (!balanced);
    }

    String long2String(long[] array) {
        if (array.length == 0) {
            return "<empty>";
        }
        StringBuilder b = new StringBuilder("[").append(array[0]);
        for (int i = 1; i < array.length; ++i) {
            b.append(", ").append(array[i]);
        }
        return b.append("]").toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doTest(Configuration conf, long[] capacities, String[] racks, long newCapacity, String newRack, boolean useTool) throws Exception {
        LOG.info((Object)("capacities = " + this.long2String(capacities)));
        LOG.info((Object)("racks      = " + Arrays.asList(racks)));
        LOG.info((Object)("newCapacity= " + newCapacity));
        LOG.info((Object)("newRack    = " + newRack));
        LOG.info((Object)("useTool    = " + useTool));
        Assert.assertEquals((long)capacities.length, (long)racks.length);
        int numOfDatanodes = capacities.length;
        this.cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length).racks(racks).simulatedCapacities(capacities).build();
        try {
            this.cluster.waitActive();
            this.client = (ClientProtocol)NameNodeProxies.createProxy((Configuration)conf, (URI)this.cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
            long totalCapacity = TestBalancer.sum(capacities);
            long totalUsedSpace = totalCapacity * 3L / 10L;
            TestBalancer.createFile(this.cluster, filePath, totalUsedSpace / (long)numOfDatanodes, (short)numOfDatanodes, 0);
            this.cluster.startDataNodes(conf, 1, true, null, new String[]{newRack}, new long[]{newCapacity});
            totalCapacity += newCapacity;
            if (useTool) {
                this.runBalancerCli(conf, totalUsedSpace, totalCapacity);
            } else {
                this.runBalancer(conf, totalUsedSpace, totalCapacity);
            }
        }
        finally {
            this.cluster.shutdown();
        }
    }

    private void runBalancer(Configuration conf, long totalUsedSpace, long totalCapacity) throws Exception {
        TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, this.client, this.cluster);
        Collection namenodes = DFSUtil.getNsServiceRpcUris((Configuration)conf);
        int r = Balancer.run((Collection)namenodes, (Balancer.Parameters)Balancer.Parameters.DEFALUT, (Configuration)conf);
        if (conf.getInt("dfs.datanode.balance.max.concurrent.moves", 5) == 0) {
            Assert.assertEquals((long)Balancer.ReturnStatus.NO_MOVE_PROGRESS.code, (long)r);
            return;
        }
        Assert.assertEquals((long)Balancer.ReturnStatus.SUCCESS.code, (long)r);
        TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, this.client, this.cluster);
        LOG.info((Object)"Rebalancing with default ctor.");
        TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, this.client, this.cluster);
    }

    private void runBalancerCli(Configuration conf, long totalUsedSpace, long totalCapacity) throws Exception {
        TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, this.client, this.cluster);
        String[] args = new String[]{"-policy", "datanode"};
        Balancer.Cli tool = new Balancer.Cli();
        tool.setConf(conf);
        int r = tool.run(args);
        Assert.assertEquals((String)"Tools should exit 0 on success", (long)0L, (long)r);
        TestBalancer.waitForHeartBeat(totalUsedSpace, totalCapacity, this.client, this.cluster);
        LOG.info((Object)"Rebalancing with default ctor.");
        TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, this.client, this.cluster);
    }

    private void oneNodeTest(Configuration conf, boolean useTool) throws Exception {
        this.doTest(conf, new long[]{500L}, new String[]{RACK0}, 250L, RACK0, useTool);
    }

    private void twoNodeTest(Configuration conf) throws Exception {
        this.doTest(conf, new long[]{500L, 500L}, new String[]{RACK0, RACK1}, 500L, RACK2, false);
    }

    public void integrationTest(Configuration conf) throws Exception {
        TestBalancer.initConf(conf);
        this.oneNodeTest(conf, false);
    }

    @Test(timeout=100000L)
    public void testBalancerCliParseWithThresholdOutOfBoundaries() {
        String[] parameters = new String[]{"-threshold", "0"};
        String reason = "IllegalArgumentException is expected when threshold value is out of boundary.";
        try {
            Balancer.Cli.parse((String[])parameters);
            Assert.fail((String)reason);
        }
        catch (IllegalArgumentException e) {
            Assert.assertEquals((Object)"Number out of range: threshold = 0.0", (Object)e.getMessage());
        }
        parameters = new String[]{"-threshold", "101"};
        try {
            Balancer.Cli.parse((String[])parameters);
            Assert.fail((String)reason);
        }
        catch (IllegalArgumentException e) {
            Assert.assertEquals((Object)"Number out of range: threshold = 101.0", (Object)e.getMessage());
        }
    }

    @Test(timeout=100000L)
    public void testBalancer0() throws Exception {
        this.testBalancer0Internal((Configuration)new HdfsConfiguration());
    }

    void testBalancer0Internal(Configuration conf) throws Exception {
        TestBalancer.initConf(conf);
        this.oneNodeTest(conf, false);
        this.twoNodeTest(conf);
    }

    @Test(timeout=100000L)
    public void testBalancer1() throws Exception {
        this.testBalancer1Internal((Configuration)new HdfsConfiguration());
    }

    void testBalancer1Internal(Configuration conf) throws Exception {
        TestBalancer.initConf(conf);
        this.testUnevenDistribution(conf, new long[]{250L, 50L}, new long[]{500L, 500L}, new String[]{RACK0, RACK1});
    }

    @Test(timeout=100000L)
    public void testBalancerWithZeroThreadsForMove() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.setInt("dfs.datanode.balance.max.concurrent.moves", 0);
        this.testBalancer1Internal((Configuration)conf);
    }

    @Test(timeout=100000L)
    public void testBalancerWithNonZeroThreadsForMove() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.setInt("dfs.datanode.balance.max.concurrent.moves", 8);
        this.testBalancer1Internal((Configuration)conf);
    }

    @Test(timeout=100000L)
    public void testBalancer2() throws Exception {
        this.testBalancer2Internal((Configuration)new HdfsConfiguration());
    }

    void testBalancer2Internal(Configuration conf) throws Exception {
        TestBalancer.initConf(conf);
        this.testBalancerDefaultConstructor(conf, new long[]{500L, 500L}, new String[]{RACK0, RACK1}, 500L, RACK2);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testBalancerDefaultConstructor(Configuration conf, long[] capacities, String[] racks, long newCapacity, String newRack) throws Exception {
        int numOfDatanodes = capacities.length;
        Assert.assertEquals((long)numOfDatanodes, (long)racks.length);
        this.cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length).racks(racks).simulatedCapacities(capacities).build();
        try {
            this.cluster.waitActive();
            this.client = (ClientProtocol)NameNodeProxies.createProxy((Configuration)conf, (URI)this.cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
            long totalCapacity = TestBalancer.sum(capacities);
            long totalUsedSpace = totalCapacity * 3L / 10L;
            TestBalancer.createFile(this.cluster, filePath, totalUsedSpace / (long)numOfDatanodes, (short)numOfDatanodes, 0);
            this.cluster.startDataNodes(conf, 1, true, null, new String[]{newRack}, new long[]{newCapacity});
            this.runBalancer(conf, totalUsedSpace, totalCapacity += newCapacity);
        }
        finally {
            this.cluster.shutdown();
        }
    }

    @Test(timeout=100000L)
    public void testExitZeroOnSuccess() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        TestBalancer.initConf((Configuration)conf);
        this.oneNodeTest((Configuration)conf, true);
    }

    public static void main(String[] args) throws Exception {
        TestBalancer balancerTest = new TestBalancer();
        balancerTest.testBalancer0();
        balancerTest.testBalancer1();
        balancerTest.testBalancer2();
    }

    static {
        ((Log4JLogger)Balancer.LOG).getLogger().setLevel(Level.ALL);
        filePath = new Path(fileName);
        r = new Random();
        Balancer.setBlockMoveWaitTime((long)1000L);
    }
}

