in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java [255:319]
public FetchResult callInternal() throws Exception {
boolean multiplex = (this.sharedFetchEnabled && this.localDiskFetchEnabled);
if (srcAttempts.size() == 0) {
return new FetchResult(host, port, partition, partitionCount, srcAttempts);
}
populateRemainingMap(srcAttempts);
for (InputAttemptIdentifier in : srcAttemptsRemaining.values()) {
if (in instanceof CompositeInputAttemptIdentifier) {
CompositeInputAttemptIdentifier cin = (CompositeInputAttemptIdentifier)in;
for (int i = 0; i < cin.getInputIdentifierCount(); i++) {
pathToAttemptMap.put(new PathPartition(cin.getPathComponent(), partition + i), cin.expand(i));
}
} else {
pathToAttemptMap.put(new PathPartition(in.getPathComponent(), 0), in);
}
// do only if all of them are shared fetches
multiplex &= in.isShared();
}
if (multiplex) {
Preconditions.checkArgument(partition == 0,
"Shared fetches cannot be done for partitioned input"
+ "- partition is non-zero (%d)", partition);
}
HostFetchResult hostFetchResult;
boolean isLocalFetch = localDiskFetchEnabled && host.equals(localHostname) && port == shufflePort;
if (isLocalFetch) {
hostFetchResult = setupLocalDiskFetch();
} else if (multiplex) {
hostFetchResult = doSharedFetch();
} else{
hostFetchResult = doHttpFetch();
}
if (hostFetchResult.failedInputs != null && hostFetchResult.failedInputs.length > 0) {
if (!isShutDown.get()) {
LOG.warn("copyInputs failed for tasks " + Arrays.toString(hostFetchResult.failedInputs));
for (InputAttemptFetchFailure left : hostFetchResult.failedInputs) {
fetcherCallback.fetchFailed(host, left, hostFetchResult.connectFailed);
}
} else {
if (isDebugEnabled) {
LOG.debug("Ignoring failed fetch reports for " + hostFetchResult.failedInputs.length +
" inputs since the fetcher has already been stopped");
}
}
}
shutdown();
// Sanity check
if (hostFetchResult.failedInputs == null && !srcAttemptsRemaining.isEmpty()) {
if (!multiplex) {
throw new IOException("server didn't return all expected map outputs: "
+ srcAttemptsRemaining.size() + " left.");
}
}
return hostFetchResult.fetchResult;
}