/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.http.BaseHttpConnection;
import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.DiskFetchedInput;
import org.apache.tez.runtime.library.common.shuffle.FetchResult;
import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
import org.apache.tez.runtime.library.common.shuffle.FetchedInputCallback;
import org.apache.tez.runtime.library.common.shuffle.FetcherCallback;
import org.apache.tez.runtime.library.common.shuffle.LocalDiskFetchedInput;
import org.apache.tez.runtime.library.common.shuffle.MemoryFetchedInput;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Fetcher
extends CallableWithNdc<FetchResult> {
    private static final Logger LOG = LoggerFactory.getLogger(Fetcher.class);
    private static final AtomicInteger fetcherIdGen = new AtomicInteger(0);
    private final Configuration conf;
    private final int shufflePort;
    private CompressionCodec codec;
    private boolean ifileReadAhead = true;
    private int ifileReadAheadLength = 0x400000;
    private final JobTokenSecretManager jobTokenSecretMgr;
    private final FetcherCallback fetcherCallback;
    private final FetchedInputAllocator inputManager;
    private final ApplicationId appId;
    private final int dagIdentifier;
    private final String logIdentifier;
    private final String localHostname;
    private final AtomicBoolean isShutDown = new AtomicBoolean(false);
    private final int fetcherIdentifier;
    private List<InputAttemptIdentifier> srcAttempts;
    @VisibleForTesting
    Map<String, InputAttemptIdentifier> srcAttemptsRemaining;
    private String host;
    private int port;
    private int partition;
    private final Map<String, InputAttemptIdentifier> pathToAttemptMap;
    private URL url;
    private volatile DataInputStream input;
    BaseHttpConnection httpConnection;
    private HttpConnectionParams httpConnectionParams;
    private final boolean localDiskFetchEnabled;
    private final boolean sharedFetchEnabled;
    private final LocalDirAllocator localDirAllocator;
    private final Path lockPath;
    private final RawLocalFileSystem localFs;
    private long retryStartTime = 0L;
    private final boolean asyncHttp;
    private final boolean verifyDiskChecksum;
    private final boolean isDebugEnabled = LOG.isDebugEnabled();

    private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params, FetchedInputAllocator inputManager, ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretManager, String srcNameTrimmed, Configuration conf, RawLocalFileSystem localFs, LocalDirAllocator localDirAllocator, Path lockPath, boolean localDiskFetchEnabled, boolean sharedFetchEnabled, String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum) {
        this.asyncHttp = asyncHttp;
        this.verifyDiskChecksum = verifyDiskChecksum;
        this.fetcherCallback = fetcherCallback;
        this.inputManager = inputManager;
        this.jobTokenSecretMgr = jobTokenSecretManager;
        this.appId = appId;
        this.dagIdentifier = dagIdentifier;
        this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>();
        this.httpConnectionParams = params;
        this.conf = conf;
        this.localDiskFetchEnabled = localDiskFetchEnabled;
        this.sharedFetchEnabled = sharedFetchEnabled;
        this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
        this.logIdentifier = " fetcher [" + srcNameTrimmed + "] " + this.fetcherIdentifier;
        this.localFs = localFs;
        this.localDirAllocator = localDirAllocator;
        this.lockPath = lockPath;
        this.localHostname = localHostname;
        this.shufflePort = shufflePort;
        try {
            if (this.sharedFetchEnabled) {
                this.localFs.mkdirs(this.lockPath);
            }
        }
        catch (Exception e) {
            LOG.warn("Error initializing local dirs for shared transfer " + e);
        }
    }

    void populateRemainingMap(List<InputAttemptIdentifier> origlist) {
        if (this.srcAttemptsRemaining == null) {
            this.srcAttemptsRemaining = new LinkedHashMap<String, InputAttemptIdentifier>(origlist.size());
        }
        for (InputAttemptIdentifier id : origlist) {
            this.srcAttemptsRemaining.put(id.toString(), id);
        }
    }

    protected FetchResult callInternal() throws Exception {
        HostFetchResult hostFetchResult;
        boolean multiplex;
        boolean bl = multiplex = this.sharedFetchEnabled && this.localDiskFetchEnabled;
        if (this.srcAttempts.size() == 0) {
            return new FetchResult(this.host, this.port, this.partition, this.srcAttempts);
        }
        this.populateRemainingMap(this.srcAttempts);
        for (InputAttemptIdentifier in : this.srcAttemptsRemaining.values()) {
            this.pathToAttemptMap.put(in.getPathComponent(), in);
            multiplex &= in.isShared();
        }
        if (multiplex) {
            Preconditions.checkArgument((this.partition == 0 ? 1 : 0) != 0, (String)"Shared fetches cannot be done for partitioned input- partition is non-zero (%d)", (Object[])new Object[]{this.partition});
        }
        if ((hostFetchResult = this.localDiskFetchEnabled && this.host.equals(this.localHostname) && this.port == this.shufflePort ? this.setupLocalDiskFetch() : (multiplex ? this.doSharedFetch() : this.doHttpFetch())).failedInputs != null && hostFetchResult.failedInputs.length > 0) {
            if (!this.isShutDown.get()) {
                LOG.warn("copyInputs failed for tasks " + Arrays.toString(hostFetchResult.failedInputs));
                for (InputAttemptIdentifier left : hostFetchResult.failedInputs) {
                    this.fetcherCallback.fetchFailed(this.host, left, hostFetchResult.connectFailed);
                }
            } else if (this.isDebugEnabled) {
                LOG.debug("Ignoring failed fetch reports for " + hostFetchResult.failedInputs.length + " inputs since the fetcher has already been stopped");
            }
        }
        this.shutdown();
        if (hostFetchResult.failedInputs == null && !this.srcAttemptsRemaining.isEmpty() && !multiplex) {
            throw new IOException("server didn't return all expected map outputs: " + this.srcAttemptsRemaining.size() + " left.");
        }
        return hostFetchResult.fetchResult;
    }

    private int findInputs() throws IOException {
        int k = 0;
        for (InputAttemptIdentifier src : this.srcAttemptsRemaining.values()) {
            try {
                if (this.getShuffleInputFileName(src.getPathComponent(), ".index") == null) continue;
                ++k;
            }
            catch (DiskChecker.DiskErrorException de) {}
        }
        return k;
    }

    private FileLock getLock() throws OverlappingFileLockException, InterruptedException, IOException {
        File lockFile = this.localFs.pathToFile(new Path(this.lockPath, this.host + ".lock"));
        boolean created = lockFile.createNewFile();
        if (!created && !lockFile.exists()) {
            return null;
        }
        FileChannel lockChannel = new RandomAccessFile(lockFile, "rws").getChannel();
        FileLock xlock = null;
        xlock = lockChannel.tryLock(0L, Long.MAX_VALUE, false);
        if (xlock != null) {
            return xlock;
        }
        lockChannel.close();
        return null;
    }

    private void releaseLock(FileLock lock) throws IOException {
        if (lock != null && lock.isValid()) {
            FileChannel lockChannel = lock.channel();
            lock.release();
            lockChannel.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected HostFetchResult doSharedFetch() throws IOException {
        int inputs = this.findInputs();
        if (inputs == this.srcAttemptsRemaining.size()) {
            if (this.isDebugEnabled) {
                LOG.debug("Using the copies found locally");
            }
            return this.doLocalDiskFetch(true);
        }
        if (inputs > 0) {
            if (this.isDebugEnabled) {
                LOG.debug("Found " + this.input + " local fetches right now, using them first");
            }
            return this.doLocalDiskFetch(false);
        }
        FileLock lock = null;
        try {
            lock = this.getLock();
            if (lock == null) {
                HostFetchResult hostFetchResult = new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.srcAttemptsRemaining.values(), "Requeuing as we didn't get a lock"), null, false);
                return hostFetchResult;
            }
            if (this.findInputs() == this.srcAttemptsRemaining.size()) {
                this.releaseLock(lock);
                lock = null;
                HostFetchResult hostFetchResult = this.doLocalDiskFetch(true);
                return hostFetchResult;
            }
            HostFetchResult hostFetchResult = this.doHttpFetch(new CachingCallBack());
            return hostFetchResult;
        }
        catch (OverlappingFileLockException jvmCrossLock) {
            LOG.warn("Double locking detected for " + this.host);
        }
        catch (InterruptedException sleepInterrupted) {
            Thread.currentThread().interrupt();
            LOG.warn("Lock was interrupted for " + this.host);
        }
        finally {
            this.releaseLock(lock);
        }
        if (this.isShutDown.get()) {
            return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.srcAttemptsRemaining.values()), null, false);
        }
        return this.doHttpFetch();
    }

    @VisibleForTesting
    protected HostFetchResult doHttpFetch() {
        return this.doHttpFetch(null);
    }

    private HostFetchResult setupConnection(Collection<InputAttemptIdentifier> attempts) {
        try {
            StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(this.host, this.port, this.partition, this.appId.toString(), this.dagIdentifier, this.httpConnectionParams.isSslShuffle());
            this.url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts, this.httpConnectionParams.isKeepAlive());
            this.httpConnection = ShuffleUtils.getHttpConnection(this.asyncHttp, this.url, this.httpConnectionParams, this.logIdentifier, this.jobTokenSecretMgr);
            this.httpConnection.connect();
        }
        catch (IOException | InterruptedException e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            InputAttemptIdentifier[] failedFetches = null;
            if (this.isShutDown.get()) {
                if (this.isDebugEnabled) {
                    LOG.debug("Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." + e.getClass().getName() + ", Message: " + e.getMessage());
                }
            } else {
                failedFetches = this.srcAttemptsRemaining.values().toArray(new InputAttemptIdentifier[this.srcAttemptsRemaining.values().size()]);
            }
            return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.srcAttemptsRemaining.values()), failedFetches, true);
        }
        if (this.isShutDown.get()) {
            this.shutdownInternal();
            if (this.isDebugEnabled) {
                LOG.debug("Detected fetcher has been shutdown after connection establishment. Returning");
            }
            return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.srcAttemptsRemaining.values()), null, false);
        }
        try {
            this.input = this.httpConnection.getInputStream();
            this.httpConnection.validate();
        }
        catch (IOException e) {
            if (this.isShutDown.get()) {
                if (this.isDebugEnabled) {
                    LOG.debug("Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." + e.getClass().getName() + ", Message: " + e.getMessage());
                }
            }
            InputAttemptIdentifier firstAttempt = attempts.iterator().next();
            LOG.warn("Fetch Failure from host while connecting: " + this.host + ", attempt: " + firstAttempt + " Informing ShuffleManager: ", (Throwable)e);
            return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.srcAttemptsRemaining.values()), new InputAttemptIdentifier[]{firstAttempt}, false);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        }
        return null;
    }

    @VisibleForTesting
    protected HostFetchResult doHttpFetch(CachingCallBack callback) {
        HostFetchResult connectionsWithRetryResult = this.setupConnection(this.srcAttemptsRemaining.values());
        if (connectionsWithRetryResult != null) {
            return connectionsWithRetryResult;
        }
        if (this.isShutDown.get()) {
            this.shutdownInternal();
            if (this.isDebugEnabled) {
                LOG.debug("Detected fetcher has been shutdown after opening stream. Returning");
            }
            return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.srcAttemptsRemaining.values()), null, false);
        }
        InputAttemptIdentifier[] failedInputs = null;
        while (!this.srcAttemptsRemaining.isEmpty() && failedInputs == null) {
            if (this.isShutDown.get()) {
                this.shutdownInternal(true);
                if (this.isDebugEnabled) {
                    LOG.debug("Fetcher already shutdown. Aborting queued fetches for " + this.srcAttemptsRemaining.size() + " inputs");
                }
                return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.srcAttemptsRemaining.values()), null, false);
            }
            try {
                failedInputs = this.fetchInputs(this.input, callback);
            }
            catch (FetcherReadTimeoutException e) {
                this.shutdownInternal(true);
                if (this.isShutDown.get()) {
                    if (this.isDebugEnabled) {
                        LOG.debug("Fetcher already shutdown. Aborting reconnection and queued fetches for " + this.srcAttemptsRemaining.size() + " inputs");
                    }
                    return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.srcAttemptsRemaining.values()), null, false);
                }
                connectionsWithRetryResult = this.setupConnection(this.srcAttemptsRemaining.values());
                if (connectionsWithRetryResult == null) continue;
                break;
            }
        }
        if (this.isShutDown.get() && failedInputs != null && failedInputs.length > 0) {
            if (this.isDebugEnabled) {
                LOG.debug("Fetcher already shutdown. Not reporting fetch failures for: " + failedInputs.length + " failed inputs");
            }
            failedInputs = null;
        }
        return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.srcAttemptsRemaining.values()), failedInputs, false);
    }

    @VisibleForTesting
    protected HostFetchResult setupLocalDiskFetch() {
        return this.doLocalDiskFetch(true);
    }

    @VisibleForTesting
    private HostFetchResult doLocalDiskFetch(boolean failMissing) {
        Iterator<Map.Entry<String, InputAttemptIdentifier>> iterator = this.srcAttemptsRemaining.entrySet().iterator();
        while (iterator.hasNext()) {
            if (this.isShutDown.get()) {
                if (!this.isDebugEnabled) break;
                LOG.debug("Already shutdown. Skipping fetch for " + this.srcAttemptsRemaining.size() + " inputs");
                break;
            }
            InputAttemptIdentifier srcAttemptId = iterator.next().getValue();
            long startTime = System.currentTimeMillis();
            LocalDiskFetchedInput fetchedInput = null;
            try {
                TezIndexRecord idxRecord = this.getTezIndexRecord(srcAttemptId);
                fetchedInput = new LocalDiskFetchedInput(idxRecord.getStartOffset(), idxRecord.getRawLength(), idxRecord.getPartLength(), srcAttemptId, this.getShuffleInputFileName(srcAttemptId.getPathComponent(), null), this.conf, new FetchedInputCallback(){

                    @Override
                    public void fetchComplete(FetchedInput fetchedInput) {
                    }

                    @Override
                    public void fetchFailed(FetchedInput fetchedInput) {
                    }

                    @Override
                    public void freeResources(FetchedInput fetchedInput) {
                    }
                });
                if (this.isDebugEnabled) {
                    LOG.debug("fetcher about to shuffle output of srcAttempt (direct disk)" + srcAttemptId + " decomp: " + idxRecord.getRawLength() + " len: " + idxRecord.getPartLength() + " to " + (Object)((Object)fetchedInput.getType()));
                }
                long endTime = System.currentTimeMillis();
                this.fetcherCallback.fetchSucceeded(this.host, srcAttemptId, fetchedInput, idxRecord.getPartLength(), idxRecord.getRawLength(), endTime - startTime);
                iterator.remove();
            }
            catch (IOException e) {
                this.cleanupFetchedInput(fetchedInput);
                if (this.isShutDown.get()) {
                    if (!this.isDebugEnabled) break;
                    LOG.debug("Already shutdown. Ignoring Local Fetch Failure for " + srcAttemptId + " from host " + this.host + " : " + e.getClass().getName() + ", message=" + e.getMessage());
                    break;
                }
                if (!failMissing) continue;
                LOG.warn("Failed to shuffle output of " + srcAttemptId + " from " + this.host + "(local fetch)", (Throwable)e);
            }
        }
        InputAttemptIdentifier[] failedFetches = null;
        if (failMissing && this.srcAttemptsRemaining.size() > 0) {
            if (this.isShutDown.get()) {
                if (this.isDebugEnabled) {
                    LOG.debug("Already shutdown, not reporting fetch failures for: " + this.srcAttemptsRemaining.size() + " remaining inputs");
                }
            } else {
                failedFetches = this.srcAttemptsRemaining.values().toArray(new InputAttemptIdentifier[this.srcAttemptsRemaining.values().size()]);
            }
        }
        return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.srcAttemptsRemaining.values()), failedFetches, false);
    }

    @VisibleForTesting
    protected TezIndexRecord getTezIndexRecord(InputAttemptIdentifier srcAttemptId) throws IOException {
        Path indexFile = this.getShuffleInputFileName(srcAttemptId.getPathComponent(), ".index");
        TezSpillRecord spillRecord = new TezSpillRecord(indexFile, this.conf);
        TezIndexRecord idxRecord = spillRecord.getIndex(this.partition);
        return idxRecord;
    }

    private static final String getMapOutputFile(String pathComponent) {
        return "output/" + pathComponent + "/" + "file.out";
    }

    @VisibleForTesting
    protected Path getShuffleInputFileName(String pathComponent, String suffix) throws IOException {
        suffix = suffix != null ? suffix : "";
        String pathFromLocalDir = Fetcher.getMapOutputFile(pathComponent) + suffix;
        return this.localDirAllocator.getLocalPathToRead(pathFromLocalDir, this.conf);
    }

    public void shutdown() {
        if (!this.isShutDown.getAndSet(true)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Shutting down fetcher for host: " + this.host);
            }
            this.shutdownInternal();
        }
    }

    private void shutdownInternal() {
        this.shutdownInternal(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void shutdownInternal(boolean disconnect) {
        AtomicBoolean atomicBoolean = this.isShutDown;
        synchronized (atomicBoolean) {
            block6: {
                try {
                    if (this.httpConnection != null) {
                        this.httpConnection.cleanup(disconnect);
                    }
                }
                catch (IOException e) {
                    LOG.info("Exception while shutting down fetcher on " + this.logIdentifier + " : " + e.getMessage());
                    if (!this.isDebugEnabled) break block6;
                    LOG.debug("", (Throwable)e);
                }
            }
        }
    }

    private InputAttemptIdentifier[] fetchInputs(DataInputStream input, CachingCallBack callback) throws FetcherReadTimeoutException {
        FetchedInput fetchedInput = null;
        InputAttemptIdentifier srcAttemptId = null;
        long decompressedLength = -1L;
        long compressedLength = -1L;
        try {
            long startTime = System.currentTimeMillis();
            int responsePartition = -1;
            String pathComponent = null;
            try {
                ShuffleHeader header = new ShuffleHeader();
                header.readFields(input);
                pathComponent = header.getMapId();
                srcAttemptId = this.pathToAttemptMap.get(pathComponent);
                compressedLength = header.getCompressedLength();
                decompressedLength = header.getUncompressedLength();
                responsePartition = header.getPartition();
            }
            catch (IllegalArgumentException e) {
                if (!this.isShutDown.get()) {
                    LOG.warn("Invalid src id ", (Throwable)e);
                    return this.srcAttemptsRemaining.values().toArray(new InputAttemptIdentifier[this.srcAttemptsRemaining.size()]);
                }
                if (this.isDebugEnabled) {
                    LOG.debug("Already shutdown. Ignoring badId error with message: " + e.getMessage());
                }
                return null;
            }
            if (!this.verifySanity(compressedLength, decompressedLength, responsePartition, srcAttemptId, pathComponent)) {
                if (!this.isShutDown.get()) {
                    if (srcAttemptId == null) {
                        LOG.warn("Was expecting " + this.getNextRemainingAttempt() + " but got null");
                        srcAttemptId = this.getNextRemainingAttempt();
                    }
                    assert (srcAttemptId != null);
                    return new InputAttemptIdentifier[]{srcAttemptId};
                }
                if (this.isDebugEnabled) {
                    LOG.debug("Already shutdown. Ignoring verification failure.");
                }
                return null;
            }
            if (this.isDebugEnabled) {
                LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength + ", decomp len: " + decompressedLength);
            }
            fetchedInput = srcAttemptId.isShared() && callback != null ? this.inputManager.allocateType(FetchedInput.Type.DISK, decompressedLength, compressedLength, srcAttemptId) : this.inputManager.allocate(decompressedLength, compressedLength, srcAttemptId);
            if (this.isDebugEnabled) {
                LOG.debug("fetcher about to shuffle output of srcAttempt " + fetchedInput.getInputAttemptIdentifier() + " decomp: " + decompressedLength + " len: " + compressedLength + " to " + (Object)((Object)fetchedInput.getType()));
            }
            if (fetchedInput.getType() == FetchedInput.Type.MEMORY) {
                ShuffleUtils.shuffleToMemory(((MemoryFetchedInput)fetchedInput).getBytes(), input, (int)decompressedLength, (int)compressedLength, this.codec, this.ifileReadAhead, this.ifileReadAheadLength, LOG, fetchedInput.getInputAttemptIdentifier().toString());
            } else if (fetchedInput.getType() == FetchedInput.Type.DISK) {
                ShuffleUtils.shuffleToDisk(((DiskFetchedInput)fetchedInput).getOutputStream(), this.host + ":" + this.port, input, compressedLength, decompressedLength, LOG, fetchedInput.getInputAttemptIdentifier().toString(), this.ifileReadAhead, this.ifileReadAheadLength, this.verifyDiskChecksum);
            } else {
                throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " + fetchedInput);
            }
            if (srcAttemptId.isShared() && callback != null) {
                callback.cache(this.host, srcAttemptId, fetchedInput, compressedLength, decompressedLength);
            }
            long endTime = System.currentTimeMillis();
            this.retryStartTime = 0L;
            this.fetcherCallback.fetchSucceeded(this.host, srcAttemptId, fetchedInput, compressedLength, decompressedLength, endTime - startTime);
            this.srcAttemptsRemaining.remove(srcAttemptId.toString());
            return null;
        }
        catch (IOException ioe) {
            if (this.isShutDown.get()) {
                this.cleanupFetchedInput(fetchedInput);
                if (this.isDebugEnabled) {
                    LOG.debug("Already shutdown. Ignoring exception during fetch " + ioe.getClass().getName() + ", Message: " + ioe.getMessage());
                }
                return null;
            }
            if (this.shouldRetry(srcAttemptId, ioe)) {
                this.cleanupFetchedInput(fetchedInput);
                throw new FetcherReadTimeoutException(ioe);
            }
            if (srcAttemptId == null || fetchedInput == null) {
                LOG.info("fetcher failed to read map header" + srcAttemptId + " decomp: " + decompressedLength + ", " + compressedLength, (Throwable)ioe);
                this.cleanupFetchedInput(fetchedInput);
                if (srcAttemptId == null) {
                    return this.srcAttemptsRemaining.values().toArray(new InputAttemptIdentifier[this.srcAttemptsRemaining.size()]);
                }
                return new InputAttemptIdentifier[]{srcAttemptId};
            }
            LOG.warn("Failed to shuffle output of " + srcAttemptId + " from " + this.host, (Throwable)ioe);
            this.cleanupFetchedInput(fetchedInput);
            return new InputAttemptIdentifier[]{srcAttemptId};
        }
    }

    private void cleanupFetchedInput(FetchedInput fetchedInput) {
        if (fetchedInput != null) {
            try {
                fetchedInput.abort();
            }
            catch (IOException e) {
                LOG.info("Failure to cleanup fetchedInput: " + fetchedInput);
            }
        }
    }

    private boolean shouldRetry(InputAttemptIdentifier srcAttemptId, IOException ioe) {
        if (!(ioe instanceof SocketTimeoutException)) {
            return false;
        }
        long currentTime = System.currentTimeMillis();
        if (this.retryStartTime == 0L) {
            this.retryStartTime = currentTime;
        }
        if (currentTime - this.retryStartTime < (long)this.httpConnectionParams.getReadTimeout()) {
            LOG.warn("Shuffle output from " + srcAttemptId + " failed, retry it.");
            return true;
        }
        LOG.warn("Timeout for copying MapOutput with retry on host " + this.host + "after " + this.httpConnectionParams.getReadTimeout() + "milliseconds.");
        return false;
    }

    private boolean verifySanity(long compressedLength, long decompressedLength, int fetchPartition, InputAttemptIdentifier srcAttemptId, String pathComponent) {
        if (compressedLength < 0L || decompressedLength < 0L) {
            LOG.warn(" invalid lengths in input header -> headerPathComponent: " + pathComponent + ", nextRemainingSrcAttemptId: " + this.getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId + " len: " + compressedLength + ", decomp len: " + decompressedLength);
            return false;
        }
        if (fetchPartition != this.partition) {
            LOG.warn(" data for the wrong reduce -> headerPathComponent: " + pathComponent + "nextRemainingSrcAttemptId: " + this.getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId + " len: " + compressedLength + " decomp len: " + decompressedLength + " for reduce " + fetchPartition);
            return false;
        }
        if (this.srcAttemptsRemaining.get(srcAttemptId.toString()) == null) {
            LOG.warn("Invalid input. Received output for headerPathComponent: " + pathComponent + "nextRemainingSrcAttemptId: " + this.getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId);
            return false;
        }
        return true;
    }

    private InputAttemptIdentifier getNextRemainingAttempt() {
        if (this.srcAttemptsRemaining.size() > 0) {
            return this.srcAttemptsRemaining.values().iterator().next();
        }
        return null;
    }

    public int hashCode() {
        return this.fetcherIdentifier;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null) {
            return false;
        }
        if (((Object)((Object)this)).getClass() != obj.getClass()) {
            return false;
        }
        Fetcher other = (Fetcher)((Object)obj);
        return this.fetcherIdentifier == other.fetcherIdentifier;
    }

    public static class FetcherBuilder {
        private Fetcher fetcher;
        private boolean workAssigned = false;

        public FetcherBuilder(FetcherCallback fetcherCallback, HttpConnectionParams params, FetchedInputAllocator inputManager, ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed, Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum) {
            this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier, jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled, false, localHostname, shufflePort, asyncHttp, verifyDiskChecksum);
        }

        public FetcherBuilder(FetcherCallback fetcherCallback, HttpConnectionParams params, FetchedInputAllocator inputManager, ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed, Configuration conf, RawLocalFileSystem localFs, LocalDirAllocator localDirAllocator, Path lockPath, boolean localDiskFetchEnabled, boolean sharedFetchEnabled, String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum) {
            this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier, jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator, lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp, verifyDiskChecksum);
        }

        public FetcherBuilder setHttpConnectionParameters(HttpConnectionParams httpParams) {
            this.fetcher.httpConnectionParams = httpParams;
            return this;
        }

        public FetcherBuilder setCompressionParameters(CompressionCodec codec) {
            this.fetcher.codec = codec;
            return this;
        }

        public FetcherBuilder setIFileParams(boolean readAhead, int readAheadBytes) {
            this.fetcher.ifileReadAhead = readAhead;
            this.fetcher.ifileReadAheadLength = readAheadBytes;
            return this;
        }

        public FetcherBuilder assignWork(String host, int port, int partition, List<InputAttemptIdentifier> inputs) {
            this.fetcher.host = host;
            this.fetcher.port = port;
            this.fetcher.partition = partition;
            this.fetcher.srcAttempts = inputs;
            this.workAssigned = true;
            return this;
        }

        public Fetcher build() {
            Preconditions.checkState((this.workAssigned ? 1 : 0) != 0, (Object)"Cannot build a fetcher withot assigning work to it");
            return this.fetcher;
        }
    }

    static class HostFetchResult {
        private final FetchResult fetchResult;
        private final InputAttemptIdentifier[] failedInputs;
        private final boolean connectFailed;

        public HostFetchResult(FetchResult fetchResult, InputAttemptIdentifier[] failedInputs, boolean connectFailed) {
            this.fetchResult = fetchResult;
            this.failedInputs = failedInputs;
            this.connectFailed = connectFailed;
        }
    }

    private final class CachingCallBack {
        private CachingCallBack() {
        }

        public void cache(String host, InputAttemptIdentifier srcAttemptId, FetchedInput fetchedInput, long compressedLength, long decompressedLength) {
            try {
                TezIndexRecord indexRec;
                Preconditions.checkArgument((Fetcher.this.partition == 0 ? 1 : 0) != 0, (Object)"Partition == 0");
                String tmpSuffix = "" + System.currentTimeMillis() + ".tmp";
                String finalOutput = Fetcher.getMapOutputFile(srcAttemptId.getPathComponent());
                Path outputPath = Fetcher.this.localDirAllocator.getLocalPathForWrite(finalOutput, compressedLength, Fetcher.this.conf);
                TezSpillRecord spillRec = new TezSpillRecord(1);
                Path tmpIndex = outputPath.suffix(".index" + tmpSuffix);
                if (Fetcher.this.localFs.exists(tmpIndex)) {
                    LOG.warn("Found duplicate instance of input index file " + tmpIndex);
                    return;
                }
                Path tmpPath = null;
                switch (fetchedInput.getType()) {
                    case DISK: {
                        DiskFetchedInput input = (DiskFetchedInput)fetchedInput;
                        indexRec = new TezIndexRecord(0L, decompressedLength, compressedLength);
                        Fetcher.this.localFs.mkdirs(outputPath.getParent());
                        tmpPath = outputPath.suffix(tmpSuffix);
                        Fetcher.this.localFs.copyFromLocalFile(input.getInputPath(), tmpPath);
                        boolean renamed = Fetcher.this.localFs.rename(tmpPath, outputPath);
                        if (renamed) break;
                        LOG.warn("Could not rename to cached file name " + outputPath);
                        Fetcher.this.localFs.delete(tmpPath, false);
                        return;
                    }
                    default: {
                        LOG.warn("Incorrect use of CachingCallback for " + srcAttemptId);
                        return;
                    }
                }
                spillRec.putIndex(indexRec, 0);
                spillRec.writeToFile(tmpIndex, Fetcher.this.conf);
                boolean renamed = Fetcher.this.localFs.rename(tmpIndex, outputPath.suffix(".index"));
                if (!renamed) {
                    Fetcher.this.localFs.delete(tmpIndex, false);
                    Fetcher.this.localFs.delete(outputPath, false);
                    LOG.warn("Could not rename the index file to " + outputPath.suffix(".index"));
                    return;
                }
            }
            catch (IOException ioe) {
                LOG.warn("Cache threw an error " + ioe);
            }
        }
    }
}

