in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/Fetcher.java [108:208]
public FetchResult call() throws Exception {
if (srcAttempts.size() == 0) {
return new FetchResult(host, port, partition, srcAttempts);
}
for (InputAttemptIdentifier in : srcAttempts) {
pathToAttemptMap.put(in.getPathComponent(), in);
}
remaining = new LinkedHashSet<InputAttemptIdentifier>(srcAttempts);
try {
StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host,
port, partition, appId.toString(), httpConnectionParams.isSSLShuffleEnabled());
this.url = ShuffleUtils.constructInputURL(baseURI.toString(), srcAttempts,
httpConnectionParams.getKeepAlive());
httpConnection = new HttpConnection(url, httpConnectionParams, logIdentifier, shuffleSecret);
httpConnection.connect();
} catch (IOException e) {
// ioErrs.increment(1);
// If connect did not succeed, just mark all the maps as failed,
// indirectly penalizing the host
if (isShutDown.get()) {
LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
} else {
for (Iterator<InputAttemptIdentifier> leftIter = remaining.iterator(); leftIter
.hasNext();) {
fetcherCallback.fetchFailed(host, leftIter.next(), true);
}
}
return new FetchResult(host, port, partition, remaining);
}
if (isShutDown.get()) {
// shutdown would have no effect if in the process of establishing the connection.
shutdownInternal();
LOG.info("Detected fetcher has been shutdown after connection establishment. Returning");
return new FetchResult(host, port, partition, remaining);
}
try {
input = httpConnection.getInputStream();
httpConnection.validate();
//validateConnectionResponse(msgToEncode, encHash);
} catch (IOException e) {
// ioErrs.increment(1);
// If we got a read error at this stage, it implies there was a problem
// with the first map, typically lost map. So, penalize only that map
// and add the rest
if (isShutDown.get()) {
LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
} else {
InputAttemptIdentifier firstAttempt = srcAttempts.get(0);
LOG.warn("Fetch Failure from host while connecting: " + host + ", attempt: " + firstAttempt
+ " Informing ShuffleManager: ", e);
fetcherCallback.fetchFailed(host, firstAttempt, false);
return new FetchResult(host, port, partition, remaining);
}
}
// By this point, the connection is setup and the response has been
// validated.
// Handle any shutdown which may have been invoked.
if (isShutDown.get()) {
// shutdown would have no effect if in the process of establishing the connection.
shutdownInternal();
LOG.info("Detected fetcher has been shutdown after opening stream. Returning");
return new FetchResult(host, port, partition, remaining);
}
// After this point, closing the stream and connection, should cause a
// SocketException,
// which will be ignored since shutdown has been invoked.
// Loop through available map-outputs and fetch them
// On any error, faildTasks is not null and we exit
// after putting back the remaining maps to the
// yet_to_be_fetched list and marking the failed tasks.
InputAttemptIdentifier[] failedInputs = null;
while (!remaining.isEmpty() && failedInputs == null) {
failedInputs = fetchInputs(input);
}
if (failedInputs != null && failedInputs.length > 0) {
LOG.warn("copyInputs failed for tasks " + Arrays.toString(failedInputs));
for (InputAttemptIdentifier left : failedInputs) {
fetcherCallback.fetchFailed(host, left, false);
}
}
shutdown();
// Sanity check
if (failedInputs == null && !remaining.isEmpty()) {
throw new IOException("server didn't return all expected map outputs: "
+ remaining.size() + " left.");
}
return new FetchResult(host, port, partition, remaining);
}