in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java [428:508]
protected Void callInternal() throws Exception {
while (!isShutdown.get() && numCompletedInputs.get() < numInputs) {
lock.lock();
try {
while ((runningFetchers.size() >= numFetchers || pendingHosts.isEmpty())
&& numCompletedInputs.get() < numInputs) {
inputContext.notifyProgress();
boolean ret = wakeLoop.await(1000, TimeUnit.MILLISECONDS);
if (isShutdown.get()) {
break;
}
}
} finally {
lock.unlock();
}
if (shuffleError != null) {
// InputContext has already been informed of a fatal error. Relying on
// tez to kill the task.
break;
}
LOG.debug("{}: NumCompletedInputs: {}", sourceDestNameTrimmed, numCompletedInputs);
if (numCompletedInputs.get() < numInputs && !isShutdown.get()) {
lock.lock();
try {
int maxFetchersToRun = numFetchers - runningFetchers.size();
int count = 0;
while (pendingHosts.peek() != null && !isShutdown.get()) {
InputHost inputHost = null;
try {
inputHost = pendingHosts.take();
} catch (InterruptedException e) {
if (isShutdown.get()) {
LOG.info(sourceDestNameTrimmed + ": " +
"Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
Thread.currentThread().interrupt();
break;
} else {
throw e;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(sourceDestNameTrimmed + ": " + "Processing pending host: " +
inputHost.toDetailedString());
}
if (inputHost.getNumPendingPartitions() > 0 && !isShutdown.get()) {
Fetcher fetcher = constructFetcherForHost(inputHost, conf);
runningFetchers.add(fetcher);
if (isShutdown.get()) {
LOG.info(sourceDestNameTrimmed + ": " + "hasBeenShutdown," +
"Breaking out of ShuffleScheduler Loop");
break;
}
ListenableFuture<FetchResult> future = fetcherExecutor
.submit(fetcher);
Futures.addCallback(future, new FetchFutureCallback(fetcher), GuavaShim.directExecutor());
if (++count >= maxFetchersToRun) {
break;
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(sourceDestNameTrimmed + ": " + "Skipping host: " +
inputHost.getIdentifier() +
" since it has no inputs to process");
}
}
}
} finally {
lock.unlock();
}
}
}
shufflePhaseTime.setValue(System.currentTimeMillis() - startTime);
LOG.info(sourceDestNameTrimmed + ": " + "Shutting down FetchScheduler, Was Interrupted: " +
Thread.currentThread().isInterrupted());
if (!fetcherExecutor.isShutdown()) {
fetcherExecutor.shutdownNow();
}
return null;
}