in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java [1389:1473]
protected Void callInternal() throws InterruptedException {
while (!isShutdown.get() && remainingMaps.get() > 0) {
synchronized (ShuffleScheduler.this) {
while ((runningFetchers.size() >= numFetchers || pendingHosts.isEmpty())
&& remainingMaps.get() > 0) {
try {
waitAndNotifyProgress();
} catch (InterruptedException e) {
if (isShutdown.get()) {
LOG.info(srcNameTrimmed + ": " +
"Interrupted while waiting for fetchers to complete " +
"and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
Thread.currentThread().interrupt();
break;
} else {
throw e;
}
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("{}: NumCompletedInputs: {}", srcNameTrimmed, (numInputs - remainingMaps.get()));
}
// Ensure there's memory available before scheduling the next Fetcher.
try {
// If merge is on, block
mergeManager.waitForInMemoryMerge();
// In case usedMemory > memorylimit, wait until some memory is released
mergeManager.waitForShuffleToMergeMemory();
} catch (InterruptedException e) {
if (isShutdown.get()) {
LOG.info(srcNameTrimmed + ": " +
"Interrupted while waiting for merge to complete and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
Thread.currentThread().interrupt();
break;
} else {
throw e;
}
}
if (!isShutdown.get() && remainingMaps.get() > 0) {
synchronized (ShuffleScheduler.this) {
int numFetchersToRun = numFetchers - runningFetchers.size();
int count = 0;
while (count < numFetchersToRun && !isShutdown.get() && remainingMaps.get() > 0) {
MapHost mapHost;
try {
mapHost = getHost(); // Leads to a wait.
} catch (InterruptedException e) {
if (isShutdown.get()) {
LOG.info(srcNameTrimmed + ": " +
"Interrupted while waiting for host and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
Thread.currentThread().interrupt();
break;
} else {
throw e;
}
}
if (mapHost == null) {
break; // Check for the exit condition.
}
LOG.debug("{}: Processing pending host: {}", srcNameTrimmed, mapHost);
if (!isShutdown.get()) {
count++;
if (LOG.isDebugEnabled()) {
LOG.debug(srcNameTrimmed + ": " + "Scheduling fetch for inputHost: {}",
mapHost.getHostIdentifier() + ":" + mapHost.getPartitionId());
}
FetcherOrderedGrouped fetcherOrderedGrouped = constructFetcherForHost(mapHost);
runningFetchers.add(fetcherOrderedGrouped);
ListenableFuture<Void> future = fetcherExecutor.submit(fetcherOrderedGrouped);
Futures.addCallback(future, new FetchFutureCallback(fetcherOrderedGrouped), GuavaShim.directExecutor());
}
}
}
}
}
LOG.info("Shutting down FetchScheduler for input: {}, wasInterrupted={}", srcNameTrimmed, Thread.currentThread().isInterrupted());
if (!fetcherExecutor.isShutdown()) {
fetcherExecutor.shutdownNow();
}
return null;
}