/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import com.google.common.annotations.VisibleForTesting;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.BlockMissingException;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.BlockReaderFactory;
import org.apache.hadoop.hdfs.BlockReaderLocal;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DomainSocketFactory;
import org.apache.hadoop.hdfs.FileInputStreamCache;
import org.apache.hadoop.hdfs.PeerCache;
import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token;

@InterfaceAudience.Private
public class DFSInputStream
extends FSInputStream
implements ByteBufferReadable,
CanSetDropBehind,
CanSetReadahead {
    @VisibleForTesting
    static boolean tcpReadsDisabledForTesting = false;
    private final PeerCache peerCache;
    private final DFSClient dfsClient;
    private boolean closed = false;
    private final String src;
    private BlockReader blockReader = null;
    private final boolean verifyChecksum;
    private LocatedBlocks locatedBlocks = null;
    private long lastBlockBeingWrittenLength = 0L;
    private DatanodeInfo currentNode = null;
    private LocatedBlock currentLocatedBlock = null;
    private long pos = 0L;
    private long blockEnd = -1L;
    private CachingStrategy cachingStrategy;
    private final ReadStatistics readStatistics = new ReadStatistics();
    private final FileInputStreamCache fileInputStreamCache;
    private int failures = 0;
    private final ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes = new ConcurrentHashMap();
    private int buffersize = 1;
    private final byte[] oneByteBuf = new byte[1];

    void addToDeadNodes(DatanodeInfo dnInfo) {
        this.deadNodes.put(dnInfo, dnInfo);
    }

    DFSInputStream(DFSClient dfsClient, String src, int buffersize, boolean verifyChecksum) throws IOException, UnresolvedLinkException {
        this.dfsClient = dfsClient;
        this.verifyChecksum = verifyChecksum;
        this.buffersize = buffersize;
        this.src = src;
        this.peerCache = dfsClient.peerCache;
        this.fileInputStreamCache = new FileInputStreamCache(dfsClient.getConf().shortCircuitStreamsCacheSize, dfsClient.getConf().shortCircuitStreamsCacheExpiryMs);
        this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy().duplicate();
        this.openInfo();
    }

    synchronized void openInfo() throws IOException, UnresolvedLinkException {
        int retriesForLastBlockLength;
        this.lastBlockBeingWrittenLength = this.fetchLocatedBlocksAndGetLastBlockLength();
        for (retriesForLastBlockLength = 3; retriesForLastBlockLength > 0 && this.lastBlockBeingWrittenLength == -1L; --retriesForLastBlockLength) {
            DFSClient.LOG.warn("Last block locations not available. Datanodes might not have reported blocks completely. Will retry for " + retriesForLastBlockLength + " times");
            this.waitFor(4000);
            this.lastBlockBeingWrittenLength = this.fetchLocatedBlocksAndGetLastBlockLength();
        }
        if (retriesForLastBlockLength == 0) {
            throw new IOException("Could not obtain the last block locations.");
        }
    }

    private void waitFor(int waitTime) throws IOException {
        try {
            Thread.sleep(waitTime);
        }
        catch (InterruptedException e) {
            throw new IOException("Interrupted while getting the last block length.");
        }
    }

    private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
        LocatedBlock last;
        LocatedBlocks newInfo = this.dfsClient.getLocatedBlocks(this.src, 0L);
        if (DFSClient.LOG.isDebugEnabled()) {
            DFSClient.LOG.debug("newInfo = " + newInfo);
        }
        if (newInfo == null) {
            throw new IOException("Cannot open filename " + this.src);
        }
        if (this.locatedBlocks != null) {
            Iterator<LocatedBlock> oldIter = this.locatedBlocks.getLocatedBlocks().iterator();
            Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
            while (oldIter.hasNext() && newIter.hasNext()) {
                if (oldIter.next().getBlock().equals(newIter.next().getBlock())) continue;
                throw new IOException("Blocklist for " + this.src + " has changed!");
            }
        }
        this.locatedBlocks = newInfo;
        long lastBlockBeingWrittenLength = 0L;
        if (!this.locatedBlocks.isLastBlockComplete() && (last = this.locatedBlocks.getLastLocatedBlock()) != null) {
            if (last.getLocations().length == 0) {
                return -1L;
            }
            long len = this.readBlockLength(last);
            last.getBlock().setNumBytes(len);
            lastBlockBeingWrittenLength = len;
        }
        this.currentNode = null;
        return lastBlockBeingWrittenLength;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private long readBlockLength(LocatedBlock locatedblock) throws IOException {
        assert (locatedblock != null) : "LocatedBlock cannot be null";
        int replicaNotFoundCount = locatedblock.getLocations().length;
        DatanodeInfo[] arr$ = locatedblock.getLocations();
        int len$ = arr$.length;
        int i$ = 0;
        while (true) {
            block13: {
                ClientDatanodeProtocol cdp;
                block14: {
                    long l;
                    block12: {
                        if (i$ >= len$) {
                            if (replicaNotFoundCount != 0) throw new IOException("Cannot obtain block length for " + locatedblock);
                            return 0L;
                        }
                        DatanodeInfo datanode = arr$[i$];
                        cdp = null;
                        try {
                            cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, this.dfsClient.getConfiguration(), this.dfsClient.getConf().socketTimeout, this.dfsClient.getConf().connectToDnViaHostname, locatedblock);
                            long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
                            if (n >= 0L) {
                                l = n;
                                if (cdp == null) return l;
                                break block12;
                            }
                            if (cdp == null) break block13;
                            break block14;
                        }
                        catch (IOException ioe) {
                            if (ioe instanceof RemoteException && ((RemoteException)ioe).unwrapRemoteException() instanceof ReplicaNotFoundException) {
                                --replicaNotFoundCount;
                            }
                            if (DFSClient.LOG.isDebugEnabled()) {
                                DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode " + datanode + " for block " + locatedblock.getBlock(), ioe);
                            }
                            break block13;
                        }
                    }
                    RPC.stopProxy(cdp);
                    return l;
                }
                RPC.stopProxy(cdp);
                break block13;
                finally {
                    if (cdp != null) {
                        RPC.stopProxy(cdp);
                    }
                }
            }
            ++i$;
        }
    }

    public synchronized long getFileLength() {
        return this.locatedBlocks == null ? 0L : this.locatedBlocks.getFileLength() + this.lastBlockBeingWrittenLength;
    }

    synchronized boolean shortCircuitForbidden() {
        return this.locatedBlocks.isUnderConstruction();
    }

    public DatanodeInfo getCurrentDatanode() {
        return this.currentNode;
    }

    public synchronized ExtendedBlock getCurrentBlock() {
        if (this.currentLocatedBlock == null) {
            return null;
        }
        return this.currentLocatedBlock.getBlock();
    }

    public synchronized List<LocatedBlock> getAllBlocks() throws IOException {
        return this.getBlockRange(0L, this.getFileLength());
    }

    private synchronized LocatedBlock getBlockAt(long offset, boolean updatePosition) throws IOException {
        LocatedBlock blk;
        assert (this.locatedBlocks != null) : "locatedBlocks is null";
        if (offset < 0L || offset >= this.getFileLength()) {
            throw new IOException("offset < 0 || offset > getFileLength(), offset=" + offset + ", updatePosition=" + updatePosition + ", locatedBlocks=" + this.locatedBlocks);
        }
        if (offset >= this.locatedBlocks.getFileLength()) {
            blk = this.locatedBlocks.getLastLocatedBlock();
        } else {
            int targetBlockIdx = this.locatedBlocks.findBlock(offset);
            if (targetBlockIdx < 0) {
                targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
                LocatedBlocks newBlocks = this.dfsClient.getLocatedBlocks(this.src, offset);
                assert (newBlocks != null) : "Could not find target position " + offset;
                this.locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
            }
            blk = this.locatedBlocks.get(targetBlockIdx);
        }
        if (updatePosition) {
            this.pos = offset;
            this.blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1L;
            this.currentLocatedBlock = blk;
        }
        return blk;
    }

    private synchronized void fetchBlockAt(long offset) throws IOException {
        LocatedBlocks newBlocks;
        int targetBlockIdx = this.locatedBlocks.findBlock(offset);
        if (targetBlockIdx < 0) {
            targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
        }
        if ((newBlocks = this.dfsClient.getLocatedBlocks(this.src, offset)) == null) {
            throw new IOException("Could not find target position " + offset);
        }
        this.locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
    }

    private synchronized List<LocatedBlock> getBlockRange(long offset, long length) throws IOException {
        if (offset >= this.getFileLength()) {
            throw new IOException("Offset: " + offset + " exceeds file length: " + this.getFileLength());
        }
        long lengthOfCompleteBlk = this.locatedBlocks.getFileLength();
        boolean readOffsetWithinCompleteBlk = offset < lengthOfCompleteBlk;
        boolean readLengthPastCompleteBlk = offset + length > lengthOfCompleteBlk;
        List<LocatedBlock> blocks = readOffsetWithinCompleteBlk ? this.getFinalizedBlockRange(offset, Math.min(length, lengthOfCompleteBlk - offset)) : new ArrayList<LocatedBlock>(1);
        if (readLengthPastCompleteBlk) {
            blocks.add(this.locatedBlocks.getLastLocatedBlock());
        }
        return blocks;
    }

    private synchronized List<LocatedBlock> getFinalizedBlockRange(long offset, long length) throws IOException {
        assert (this.locatedBlocks != null) : "locatedBlocks is null";
        ArrayList<LocatedBlock> blockRange = new ArrayList<LocatedBlock>();
        int blockIdx = this.locatedBlocks.findBlock(offset);
        if (blockIdx < 0) {
            blockIdx = LocatedBlocks.getInsertIndex(blockIdx);
        }
        long remaining = length;
        long curOff = offset;
        while (remaining > 0L) {
            LocatedBlock blk = null;
            if (blockIdx < this.locatedBlocks.locatedBlockCount()) {
                blk = this.locatedBlocks.get(blockIdx);
            }
            if (blk == null || curOff < blk.getStartOffset()) {
                LocatedBlocks newBlocks = this.dfsClient.getLocatedBlocks(this.src, curOff, remaining);
                this.locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());
                continue;
            }
            assert (curOff >= blk.getStartOffset()) : "Block not found";
            blockRange.add(blk);
            long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff;
            remaining -= bytesRead;
            curOff += bytesRead;
            ++blockIdx;
        }
        return blockRange;
    }

    private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
        if (target >= this.getFileLength()) {
            throw new IOException("Attempted to read past end of file");
        }
        if (this.blockReader != null) {
            this.blockReader.close();
            this.blockReader = null;
        }
        DatanodeInfo chosenNode = null;
        int refetchToken = 1;
        int refetchEncryptionKey = 1;
        boolean connectFailedOnce = false;
        while (true) {
            LocatedBlock targetBlock = this.getBlockAt(target, true);
            assert (target == this.pos) : "Wrong postion " + this.pos + " expect " + target;
            long offsetIntoBlock = target - targetBlock.getStartOffset();
            DNAddrPair retval = this.chooseDataNode(targetBlock);
            chosenNode = retval.info;
            InetSocketAddress targetAddr = retval.addr;
            try {
                ExtendedBlock blk = targetBlock.getBlock();
                Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
                this.blockReader = this.getBlockReader(targetAddr, chosenNode, this.src, blk, accessToken, offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock, this.buffersize, this.verifyChecksum, this.dfsClient.clientName);
                if (connectFailedOnce) {
                    DFSClient.LOG.info("Successfully connected to " + targetAddr + " for " + blk);
                }
                return chosenNode;
            }
            catch (AccessControlException ex) {
                DFSClient.LOG.warn("Short circuit access failed " + ex);
                this.dfsClient.disableLegacyBlockReaderLocal();
                continue;
            }
            catch (IOException ex) {
                if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
                    DFSClient.LOG.info("Will fetch a new encryption key and retry, encryption key was invalid when connecting to " + targetAddr + " : " + ex);
                    --refetchEncryptionKey;
                    this.dfsClient.clearDataEncryptionKey();
                    continue;
                }
                if (ex instanceof InvalidBlockTokenException && refetchToken > 0) {
                    DFSClient.LOG.info("Will fetch a new access token and retry, access token was invalid when connecting to " + targetAddr + " : " + ex);
                    --refetchToken;
                    this.fetchBlockAt(target);
                    continue;
                }
                connectFailedOnce = true;
                DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block" + ", add to deadNodes and continue. " + ex, ex);
                this.addToDeadNodes(chosenNode);
                continue;
            }
            break;
        }
    }

    @Override
    public synchronized void close() throws IOException {
        if (this.closed) {
            return;
        }
        this.dfsClient.checkOpen();
        if (this.blockReader != null) {
            this.blockReader.close();
            this.blockReader = null;
        }
        super.close();
        this.fileInputStreamCache.close();
        this.closed = true;
    }

    @Override
    public synchronized int read() throws IOException {
        int ret = this.read(this.oneByteBuf, 0, 1);
        return ret <= 0 ? -1 : this.oneByteBuf[0] & 0xFF;
    }

    private static void updateReadStatistics(ReadStatistics readStatistics, int nRead, BlockReader blockReader) {
        if (nRead <= 0) {
            return;
        }
        if (blockReader.isShortCircuit()) {
            readStatistics.totalBytesRead += nRead;
            readStatistics.totalLocalBytesRead += nRead;
            readStatistics.totalShortCircuitBytesRead += nRead;
        } else if (blockReader.isLocal()) {
            readStatistics.totalBytesRead += nRead;
            readStatistics.totalLocalBytesRead += nRead;
        } else {
            readStatistics.totalBytesRead += nRead;
        }
    }

    private synchronized int readBuffer(ReaderStrategy reader, int off, int len, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) throws IOException {
        boolean retryCurrentNode = true;
        while (true) {
            IOException ioe;
            try {
                return reader.doRead(this.blockReader, off, len, this.readStatistics);
            }
            catch (ChecksumException ce) {
                DFSClient.LOG.warn("Found Checksum error for " + this.getCurrentBlock() + " from " + this.currentNode + " at " + ce.getPos());
                ioe = ce;
                retryCurrentNode = false;
                this.addIntoCorruptedBlockMap(this.getCurrentBlock(), this.currentNode, corruptedBlockMap);
            }
            catch (IOException e) {
                if (!retryCurrentNode) {
                    DFSClient.LOG.warn("Exception while reading from " + this.getCurrentBlock() + " of " + this.src + " from " + this.currentNode, e);
                }
                ioe = e;
            }
            boolean sourceFound = false;
            if (retryCurrentNode) {
                sourceFound = this.seekToBlockSource(this.pos);
            } else {
                this.addToDeadNodes(this.currentNode);
                sourceFound = this.seekToNewSource(this.pos);
            }
            if (!sourceFound) {
                throw ioe;
            }
            retryCurrentNode = false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
        this.dfsClient.checkOpen();
        if (this.closed) {
            throw new IOException("Stream closed");
        }
        HashMap<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap = new HashMap<ExtendedBlock, Set<DatanodeInfo>>();
        this.failures = 0;
        if (this.pos < this.getFileLength()) {
            int retries = 2;
            while (retries > 0) {
                try {
                    int realLen;
                    int result;
                    if (this.pos > this.blockEnd || this.currentNode == null) {
                        this.currentNode = this.blockSeekTo(this.pos);
                    }
                    if ((result = this.readBuffer(strategy, off, realLen = (int)Math.min((long)len, this.blockEnd - this.pos + 1L), corruptedBlockMap)) >= 0) {
                        this.pos += (long)result;
                    } else {
                        throw new IOException("Unexpected EOS from the reader");
                    }
                    if (this.dfsClient.stats != null && result != -1) {
                        this.dfsClient.stats.incrementBytesRead(result);
                    }
                    int n = result;
                    return n;
                }
                catch (ChecksumException ce) {
                    throw ce;
                }
                catch (IOException e) {
                    if (retries == 1) {
                        DFSClient.LOG.warn("DFS Read", e);
                    }
                    this.blockEnd = -1L;
                    if (this.currentNode != null) {
                        this.addToDeadNodes(this.currentNode);
                    }
                    if (--retries != 0) continue;
                    throw e;
                }
                finally {
                    this.reportCheckSumFailure(corruptedBlockMap, this.currentLocatedBlock.getLocations().length);
                }
            }
        }
        return -1;
    }

    @Override
    public synchronized int read(byte[] buf, int off, int len) throws IOException {
        ByteArrayStrategy byteArrayReader = new ByteArrayStrategy(buf);
        return this.readWithStrategy(byteArrayReader, off, len);
    }

    @Override
    public synchronized int read(ByteBuffer buf) throws IOException {
        ByteBufferStrategy byteBufferReader = new ByteBufferStrategy(buf);
        return this.readWithStrategy(byteBufferReader, 0, buf.remaining());
    }

    private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
        Set<Object> dnSet = null;
        dnSet = corruptedBlockMap.containsKey(blk) ? corruptedBlockMap.get(blk) : new HashSet();
        if (!dnSet.contains(node)) {
            dnSet.add(node);
            corruptedBlockMap.put(blk, dnSet);
        }
    }

    private DNAddrPair chooseDataNode(LocatedBlock block) throws IOException {
        while (true) {
            DatanodeInfo[] nodes = block.getLocations();
            try {
                DatanodeInfo chosenNode = DFSInputStream.bestNode(nodes, this.deadNodes);
                String dnAddr = chosenNode.getXferAddr(this.dfsClient.getConf().connectToDnViaHostname);
                if (DFSClient.LOG.isDebugEnabled()) {
                    DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
                }
                InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
                return new DNAddrPair(chosenNode, targetAddr);
            }
            catch (IOException ie) {
                String blockInfo = block.getBlock() + " file=" + this.src;
                if (this.failures >= this.dfsClient.getMaxBlockAcquireFailures()) {
                    throw new BlockMissingException(this.src, "Could not obtain block: " + blockInfo, block.getStartOffset());
                }
                if (nodes == null || nodes.length == 0) {
                    DFSClient.LOG.info("No node available for " + blockInfo);
                }
                DFSClient.LOG.info("Could not obtain " + block.getBlock() + " from any node: " + ie + ". Will get new block locations from namenode and retry...");
                try {
                    int timeWindow = this.dfsClient.getConf().timeWindow;
                    double waitTime = (double)(timeWindow * this.failures) + (double)(timeWindow * (this.failures + 1)) * DFSUtil.getRandom().nextDouble();
                    DFSClient.LOG.warn("DFS chooseDataNode: got # " + (this.failures + 1) + " IOException, will wait for " + waitTime + " msec.");
                    Thread.sleep((long)waitTime);
                }
                catch (InterruptedException iex) {
                    // empty catch block
                }
                this.deadNodes.clear();
                this.openInfo();
                block = this.getBlockAt(block.getStartOffset(), false);
                ++this.failures;
                continue;
            }
            break;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void fetchBlockByteRange(LocatedBlock block, long start, long end, byte[] buf, int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) throws IOException {
        int refetchToken = 1;
        int refetchEncryptionKey = 1;
        while (true) {
            DatanodeInfo chosenNode;
            block14: {
                block = this.getBlockAt(block.getStartOffset(), false);
                DNAddrPair retval = this.chooseDataNode(block);
                chosenNode = retval.info;
                InetSocketAddress targetAddr = retval.addr;
                BlockReader reader = null;
                try {
                    Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
                    int len = (int)(end - start + 1L);
                    reader = this.getBlockReader(targetAddr, chosenNode, this.src, block.getBlock(), blockToken, start, len, this.buffersize, this.verifyChecksum, this.dfsClient.clientName);
                    int nread = reader.readAll(buf, offset, len);
                    if (nread != len) {
                        throw new IOException("truncated return from reader.read(): excpected " + len + ", got " + nread);
                    }
                    return;
                }
                catch (ChecksumException e) {
                    DFSClient.LOG.warn("fetchBlockByteRange(). Got a checksum exception for " + this.src + " at " + block.getBlock() + ":" + e.getPos() + " from " + chosenNode);
                    this.addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
                }
                catch (AccessControlException ex) {
                    DFSClient.LOG.warn("Short circuit access failed " + ex);
                    this.dfsClient.disableLegacyBlockReaderLocal();
                    continue;
                }
                catch (IOException e) {
                    if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
                        DFSClient.LOG.info("Will fetch a new encryption key and retry, encryption key was invalid when connecting to " + targetAddr + " : " + e);
                        --refetchEncryptionKey;
                        this.dfsClient.clearDataEncryptionKey();
                        break block14;
                    }
                    if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
                        DFSClient.LOG.info("Will get a new access token and retry, access token was invalid when connecting to " + targetAddr + " : " + e);
                        --refetchToken;
                        this.fetchBlockAt(block.getStartOffset());
                        continue;
                    }
                    DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for file " + this.src + " for block " + block.getBlock() + ":" + e);
                    if (!DFSClient.LOG.isDebugEnabled()) break block14;
                    DFSClient.LOG.debug("Connection failure ", e);
                }
                finally {
                    if (reader == null) continue;
                    reader.close();
                    continue;
                }
            }
            this.addToDeadNodes(chosenNode);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Peer newTcpPeer(InetSocketAddress addr) throws IOException {
        Peer peer;
        block3: {
            Peer peer2 = null;
            boolean success = false;
            Socket sock = null;
            try {
                sock = this.dfsClient.socketFactory.createSocket();
                NetUtils.connect(sock, addr, this.dfsClient.getRandomLocalInterfaceAddr(), this.dfsClient.getConf().socketTimeout);
                peer2 = TcpPeerServer.peerFromSocketAndKey(sock, this.dfsClient.getDataEncryptionKey());
                success = true;
                peer = peer2;
                if (success) break block3;
            }
            catch (Throwable throwable) {
                if (!success) {
                    IOUtils.closeQuietly(peer2);
                    IOUtils.closeQuietly(sock);
                }
                throw throwable;
            }
            IOUtils.closeQuietly(peer2);
            IOUtils.closeQuietly(sock);
        }
        return peer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected BlockReader getBlockReader(InetSocketAddress dnAddr, DatanodeInfo chosenNode, String file, ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, long startOffset, long len, int bufferSize, boolean verifyChecksum, String clientName) throws IOException {
        Peer peer;
        Peer peer2;
        int cacheTries;
        FileInputStream[] fis = this.fileInputStreamCache.get(chosenNode, block);
        if (fis != null) {
            if (DFSClient.LOG.isDebugEnabled()) {
                DFSClient.LOG.debug("got FileInputStreams for " + block + " from " + "the FileInputStreamCache.");
            }
            return new BlockReaderLocal(this.dfsClient.getConf(), file, block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum, this.fileInputStreamCache);
        }
        if (this.dfsClient.useLegacyBlockReaderLocal() && DFSClient.isLocalAddress(dnAddr) && !this.shortCircuitForbidden()) {
            try {
                return BlockReaderFactory.getLegacyBlockReaderLocal(this.dfsClient, clientName, block, blockToken, chosenNode, startOffset);
            }
            catch (IOException e) {
                DFSClient.LOG.warn("error creating legacy BlockReaderLocal.  Disabling legacy local reads.", e);
                this.dfsClient.disableLegacyBlockReaderLocal();
            }
        }
        DomainSocketFactory dsFactory = this.dfsClient.getDomainSocketFactory();
        BlockReader reader = null;
        int nCachedConnRetry = this.dfsClient.getConf().nCachedConnRetry;
        for (cacheTries = 0; cacheTries < nCachedConnRetry && (peer2 = this.peerCache.get(chosenNode, true)) != null; ++cacheTries) {
            try {
                boolean allowShortCircuitLocalReads = this.dfsClient.getConf().shortCircuitLocalReads && !this.shortCircuitForbidden();
                BlockReader blockReader = reader = BlockReaderFactory.newBlockReader(this.dfsClient.getConf(), file, block, blockToken, startOffset, len, verifyChecksum, clientName, peer2, chosenNode, dsFactory, this.peerCache, this.fileInputStreamCache, allowShortCircuitLocalReads, this.cachingStrategy);
                return blockReader;
            }
            catch (IOException ex) {
                DFSClient.LOG.debug("Error making BlockReader with DomainSocket. Closing stale " + peer2, ex);
                continue;
            }
            finally {
                if (reader == null) {
                    IOUtils.closeQuietly(peer2);
                }
            }
        }
        DomainSocket domSock = dsFactory.create(dnAddr, this);
        if (domSock != null) {
            peer = new DomainPeer(domSock);
            try {
                boolean allowShortCircuitLocalReads = this.dfsClient.getConf().shortCircuitLocalReads && !this.shortCircuitForbidden();
                BlockReader blockReader = reader = BlockReaderFactory.newBlockReader(this.dfsClient.getConf(), file, block, blockToken, startOffset, len, verifyChecksum, clientName, peer, chosenNode, dsFactory, this.peerCache, this.fileInputStreamCache, allowShortCircuitLocalReads, this.cachingStrategy);
                return blockReader;
            }
            catch (IOException e) {
                DFSClient.LOG.warn("failed to connect to " + domSock, e);
            }
            finally {
                if (reader == null) {
                    dsFactory.disableDomainSocketPath(domSock.getPath());
                    IOUtils.closeQuietly(peer);
                }
            }
        }
        while (cacheTries < nCachedConnRetry && (peer = this.peerCache.get(chosenNode, false)) != null) {
            try {
                BlockReader e = reader = BlockReaderFactory.newBlockReader(this.dfsClient.getConf(), file, block, blockToken, startOffset, len, verifyChecksum, clientName, peer, chosenNode, dsFactory, this.peerCache, this.fileInputStreamCache, false, this.cachingStrategy);
                return e;
            }
            catch (IOException ex) {
                DFSClient.LOG.debug("Error making BlockReader. Closing stale " + peer, ex);
            }
            finally {
                if (reader == null) {
                    IOUtils.closeQuietly(peer);
                }
            }
            ++cacheTries;
        }
        if (tcpReadsDisabledForTesting) {
            throw new IOException("TCP reads are disabled.");
        }
        peer = this.newTcpPeer(dnAddr);
        return BlockReaderFactory.newBlockReader(this.dfsClient.getConf(), file, block, blockToken, startOffset, len, verifyChecksum, clientName, peer, chosenNode, dsFactory, this.peerCache, this.fileInputStreamCache, false, this.cachingStrategy);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int read(long position, byte[] buffer, int offset, int length) throws IOException {
        this.dfsClient.checkOpen();
        if (this.closed) {
            throw new IOException("Stream closed");
        }
        this.failures = 0;
        long filelen = this.getFileLength();
        if (position < 0L || position >= filelen) {
            return -1;
        }
        int realLen = length;
        if (position + (long)length > filelen) {
            realLen = (int)(filelen - position);
        }
        List<LocatedBlock> blockRange = this.getBlockRange(position, realLen);
        int remaining = realLen;
        HashMap<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap = new HashMap<ExtendedBlock, Set<DatanodeInfo>>();
        for (LocatedBlock blk : blockRange) {
            long targetStart = position - blk.getStartOffset();
            long bytesToRead = Math.min((long)remaining, blk.getBlockSize() - targetStart);
            try {
                this.fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1L, buffer, offset, corruptedBlockMap);
            }
            finally {
                this.reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length);
            }
            remaining = (int)((long)remaining - bytesToRead);
            position += bytesToRead;
            offset = (int)((long)offset + bytesToRead);
        }
        assert (remaining == 0) : "Wrong number of bytes read.";
        if (this.dfsClient.stats != null) {
            this.dfsClient.stats.incrementBytesRead(realLen);
        }
        return realLen;
    }

    private void reportCheckSumFailure(Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, int dataNodeCount) {
        if (corruptedBlockMap.isEmpty()) {
            return;
        }
        Iterator<Map.Entry<ExtendedBlock, Set<DatanodeInfo>>> it = corruptedBlockMap.entrySet().iterator();
        Map.Entry<ExtendedBlock, Set<DatanodeInfo>> entry = it.next();
        ExtendedBlock blk = entry.getKey();
        Set<DatanodeInfo> dnSet = entry.getValue();
        if (dnSet.size() < dataNodeCount && dnSet.size() > 0 || dataNodeCount == 1 && dnSet.size() == dataNodeCount) {
            DatanodeInfo[] locs = new DatanodeInfo[dnSet.size()];
            int i = 0;
            for (DatanodeInfo dn : dnSet) {
                locs[i++] = dn;
            }
            LocatedBlock[] lblocks = new LocatedBlock[]{new LocatedBlock(blk, locs)};
            this.dfsClient.reportChecksumFailure(this.src, lblocks);
        }
        corruptedBlockMap.clear();
    }

    @Override
    public long skip(long n) throws IOException {
        if (n > 0L) {
            long fileLen;
            long curPos = this.getPos();
            if (n + curPos > (fileLen = this.getFileLength())) {
                n = fileLen - curPos;
            }
            this.seek(curPos + n);
            return n;
        }
        return n < 0L ? -1L : 0L;
    }

    @Override
    public synchronized void seek(long targetPos) throws IOException {
        boolean done;
        block8: {
            int diff;
            if (targetPos > this.getFileLength()) {
                throw new IOException("Cannot seek after EOF");
            }
            if (targetPos < 0L) {
                throw new IOException("Cannot seek to negative offset");
            }
            if (this.closed) {
                throw new IOException("Stream is closed!");
            }
            done = false;
            if (this.pos <= targetPos && targetPos <= this.blockEnd && (diff = (int)(targetPos - this.pos)) <= this.blockReader.available()) {
                try {
                    this.pos += this.blockReader.skip(diff);
                    if (this.pos == targetPos) {
                        done = true;
                    }
                }
                catch (IOException e) {
                    if (!DFSClient.LOG.isDebugEnabled()) break block8;
                    DFSClient.LOG.debug("Exception while seek to " + targetPos + " from " + this.getCurrentBlock() + " of " + this.src + " from " + this.currentNode, e);
                }
            }
        }
        if (!done) {
            this.pos = targetPos;
            this.blockEnd = -1L;
        }
    }

    private synchronized boolean seekToBlockSource(long targetPos) throws IOException {
        this.currentNode = this.blockSeekTo(targetPos);
        return true;
    }

    @Override
    public synchronized boolean seekToNewSource(long targetPos) throws IOException {
        boolean markedDead = this.deadNodes.containsKey(this.currentNode);
        this.addToDeadNodes(this.currentNode);
        DatanodeInfo oldNode = this.currentNode;
        DatanodeInfo newNode = this.blockSeekTo(targetPos);
        if (!markedDead) {
            this.deadNodes.remove(oldNode);
        }
        if (!oldNode.getStorageID().equals(newNode.getStorageID())) {
            this.currentNode = newNode;
            return true;
        }
        return false;
    }

    @Override
    public synchronized long getPos() throws IOException {
        return this.pos;
    }

    @Override
    public synchronized int available() throws IOException {
        if (this.closed) {
            throw new IOException("Stream closed");
        }
        long remaining = this.getFileLength() - this.pos;
        return remaining <= Integer.MAX_VALUE ? (int)remaining : Integer.MAX_VALUE;
    }

    @Override
    public boolean markSupported() {
        return false;
    }

    @Override
    public void mark(int readLimit) {
    }

    @Override
    public void reset() throws IOException {
        throw new IOException("Mark/reset not supported");
    }

    static DatanodeInfo bestNode(DatanodeInfo[] nodes, AbstractMap<DatanodeInfo, DatanodeInfo> deadNodes) throws IOException {
        if (nodes != null) {
            for (int i = 0; i < nodes.length; ++i) {
                if (deadNodes.containsKey(nodes[i])) continue;
                return nodes[i];
            }
        }
        throw new IOException("No live nodes contain current block");
    }

    public synchronized ReadStatistics getReadStatistics() {
        return new ReadStatistics(this.readStatistics);
    }

    private synchronized void closeCurrentBlockReader() {
        if (this.blockReader == null) {
            return;
        }
        try {
            this.blockReader.close();
        }
        catch (IOException e) {
            DFSClient.LOG.error("error closing blockReader", e);
        }
        this.blockReader = null;
    }

    @Override
    public synchronized void setReadahead(Long readahead) throws IOException {
        this.cachingStrategy.setReadahead(readahead);
        this.closeCurrentBlockReader();
    }

    @Override
    public synchronized void setDropBehind(Boolean dropBehind) throws IOException {
        this.cachingStrategy.setDropBehind(dropBehind);
        this.closeCurrentBlockReader();
    }

    static class DNAddrPair {
        DatanodeInfo info;
        InetSocketAddress addr;

        DNAddrPair(DatanodeInfo info, InetSocketAddress addr) {
            this.info = info;
            this.addr = addr;
        }
    }

    private static class ByteBufferStrategy
    implements ReaderStrategy {
        final ByteBuffer buf;

        ByteBufferStrategy(ByteBuffer buf) {
            this.buf = buf;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public int doRead(BlockReader blockReader, int off, int len, ReadStatistics readStatistics) throws ChecksumException, IOException {
            int oldpos = this.buf.position();
            int oldlimit = this.buf.limit();
            boolean success = false;
            try {
                int ret = blockReader.read(this.buf);
                success = true;
                DFSInputStream.updateReadStatistics(readStatistics, ret, blockReader);
                int n = ret;
                return n;
            }
            finally {
                if (!success) {
                    this.buf.position(oldpos);
                    this.buf.limit(oldlimit);
                }
            }
        }
    }

    private static class ByteArrayStrategy
    implements ReaderStrategy {
        final byte[] buf;

        public ByteArrayStrategy(byte[] buf) {
            this.buf = buf;
        }

        @Override
        public int doRead(BlockReader blockReader, int off, int len, ReadStatistics readStatistics) throws ChecksumException, IOException {
            int nRead = blockReader.read(this.buf, off, len);
            DFSInputStream.updateReadStatistics(readStatistics, nRead, blockReader);
            return nRead;
        }
    }

    private static interface ReaderStrategy {
        public int doRead(BlockReader var1, int var2, int var3, ReadStatistics var4) throws ChecksumException, IOException;
    }

    public static class ReadStatistics {
        private long totalBytesRead;
        private long totalLocalBytesRead;
        private long totalShortCircuitBytesRead;

        public ReadStatistics() {
            this.totalBytesRead = 0L;
            this.totalLocalBytesRead = 0L;
            this.totalShortCircuitBytesRead = 0L;
        }

        public ReadStatistics(ReadStatistics rhs) {
            this.totalBytesRead = rhs.getTotalBytesRead();
            this.totalLocalBytesRead = rhs.getTotalLocalBytesRead();
            this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead();
        }

        public long getTotalBytesRead() {
            return this.totalBytesRead;
        }

        public long getTotalLocalBytesRead() {
            return this.totalLocalBytesRead;
        }

        public long getTotalShortCircuitBytesRead() {
            return this.totalShortCircuitBytesRead;
        }

        public long getRemoteBytesRead() {
            return this.totalBytesRead - this.totalLocalBytesRead;
        }

        void addRemoteBytes(long amt) {
            this.totalBytesRead += amt;
        }

        void addLocalBytes(long amt) {
            this.totalBytesRead += amt;
            this.totalLocalBytesRead += amt;
        }

        void addShortCircuitBytes(long amt) {
            this.totalBytesRead += amt;
            this.totalLocalBytesRead += amt;
            this.totalShortCircuitBytesRead += amt;
        }
    }
}

