in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java [211:284]
public Void call() throws Exception {
while (!isShutdown.get() && numCompletedInputs.get() < numInputs) {
lock.lock();
try {
if (runningFetchers.size() >= numFetchers || pendingHosts.isEmpty()) {
if (numCompletedInputs.get() < numInputs) {
wakeLoop.await();
}
}
} finally {
lock.unlock();
}
if (shuffleError != null) {
// InputContext has already been informed of a fatal error. Relying on
// tez to kill the task.
break;
}
if (LOG.isDebugEnabled()) {
LOG.debug("NumCompletedInputs: " + numCompletedInputs);
}
if (numCompletedInputs.get() < numInputs && !isShutdown.get()) {
lock.lock();
try {
int maxFetchersToRun = numFetchers - runningFetchers.size();
int count = 0;
while (pendingHosts.peek() != null && !isShutdown.get()) {
InputHost inputHost = null;
try {
inputHost = pendingHosts.take();
} catch (InterruptedException e) {
if (isShutdown.get()) {
LOG.info("Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
break;
} else {
throw e;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Processing pending host: " + inputHost.toDetailedString());
}
if (inputHost.getNumPendingInputs() > 0 && !isShutdown.get()) {
LOG.info("Scheduling fetch for inputHost: " + inputHost.getIdentifier());
Fetcher fetcher = constructFetcherForHost(inputHost);
runningFetchers.add(fetcher);
if (isShutdown.get()) {
LOG.info("hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
}
ListenableFuture<FetchResult> future = fetcherExecutor
.submit(fetcher);
Futures.addCallback(future, new FetchFutureCallback(fetcher));
if (++count >= maxFetchersToRun) {
break;
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping host: " + inputHost.getIdentifier()
+ " since it has no inputs to process");
}
}
}
} finally {
lock.unlock();
}
}
}
LOG.info("Shutting down FetchScheduler, Was Interrupted: " + Thread.currentThread().isInterrupted());
// TODO NEWTEZ Maybe clean up inputs.
if (!fetcherExecutor.isShutdown()) {
fetcherExecutor.shutdownNow();
}
return null;
}