in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java [191:290]
protected void copyFromHost(MapHost host) throws IOException {
// Get completed maps on 'host'
List<InputAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
currentPartition = host.getPartitionId();
// Sanity check to catch hosts with only 'OBSOLETE' maps,
// especially at the tail of large jobs
if (srcAttempts.size() == 0) {
return;
}
if(LOG.isDebugEnabled()) {
LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
+ srcAttempts + ", partitionId: " + currentPartition);
}
// List of maps to be fetched yet
remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);
// Construct the url and connect
boolean connectSucceeded = false;
try {
URL url = ShuffleUtils.constructInputURL(host.getBaseUrl(), srcAttempts,
httpConnectionParams.getKeepAlive());
httpConnection = new HttpConnection(url, httpConnectionParams,
logIdentifier, jobTokenSecret);
connectSucceeded = httpConnection.connect();
if (stopped) {
LOG.info("Detected fetcher has been shutdown after connection establishment. Returning");
cleanupCurrentConnection(true);
putBackRemainingMapOutputs(host);
return;
}
input = httpConnection.getInputStream();
httpConnection.validate();
} catch (IOException ie) {
if (stopped) {
LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
cleanupCurrentConnection(true);
putBackRemainingMapOutputs(host);
return;
}
ioErrs.increment(1);
if (!connectSucceeded) {
LOG.warn("Failed to connect to " + host + " with " + remaining.size() + " inputs", ie);
connectionErrs.increment(1);
} else {
LOG.warn("Failed to verify reply after connecting to " + host + " with " + remaining.size()
+ " inputs pending", ie);
}
// At this point, either the connection failed, or the initial header verification failed.
// The error does not relate to any specific Input. Report all of them as failed.
// This ends up indirectly penalizing the host (multiple failures reported on the single host)
for(InputAttemptIdentifier left: remaining) {
// Need to be handling temporary glitches ..
// Report read error to the AM to trigger source failure heuristics
scheduler.copyFailed(left, host, connectSucceeded, !connectSucceeded);
}
// Add back all remaining maps - which at this point is ALL MAPS the
// Fetcher was started with. The Scheduler takes care of retries,
// reporting too many failures etc.
putBackRemainingMapOutputs(host);
return;
}
try {
// Loop through available map-outputs and fetch them
// On any error, faildTasks is not null and we exit
// after putting back the remaining maps to the
// yet_to_be_fetched list and marking the failed tasks.
InputAttemptIdentifier[] failedTasks = null;
while (!remaining.isEmpty() && failedTasks == null) {
// fail immediately after first failure because we dont know how much to
// skip for this error in the input stream. So we cannot move on to the
// remaining outputs. YARN-1773. Will get to them in the next retry.
failedTasks = copyMapOutput(host, input);
}
if(failedTasks != null && failedTasks.length > 0) {
LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
for(InputAttemptIdentifier left: failedTasks) {
scheduler.copyFailed(left, host, true, false);
}
}
cleanupCurrentConnection(false);
// Sanity check
if (failedTasks == null && !remaining.isEmpty()) {
throw new IOException("server didn't return all expected map outputs: "
+ remaining.size() + " left.");
}
} finally {
putBackRemainingMapOutputs(host);
}
}