in client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/CelebornScheduler.java [176:283]
protected Void callInternal() throws InterruptedException {
while (!isShutdown.get() && !isAllInputFetched()) {
synchronized (CelebornScheduler.this) {
while (!allInputTaskAttemptDone()
|| ((celebornRunningFetchers.size() >= numFetchers || pendingHosts.isEmpty())
&& !isAllInputFetched())) {
try {
waitAndNotifyProgress();
} catch (InterruptedException e) {
if (isShutdown.get()) {
LOG.info(
srcNameTrimmed
+ ": "
+ "Interrupted while waiting for fetchers to complete "
+ "and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
Thread.currentThread().interrupt();
break;
} else {
throw e;
}
}
}
}
// Ensure there's memory available before scheduling the next Fetcher.
try {
// If merge is on, block
mergeManager.waitForInMemoryMerge();
// In case usedMemory > memorylimit, wait until some memory is released
mergeManager.waitForShuffleToMergeMemory();
} catch (InterruptedException e) {
if (isShutdown.get()) {
LOG.info(
srcNameTrimmed
+ ": "
+ "Interrupted while waiting for merge to complete and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
Thread.currentThread().interrupt();
break;
} else {
throw e;
}
}
if (!isShutdown.get() && !isAllInputFetched()) {
synchronized (CelebornScheduler.this) {
int numFetchersToRun = numFetchers - celebornRunningFetchers.size();
int count = 0;
while (count < numFetchersToRun && !isShutdown.get() && !isAllInputFetched()) {
MapHost mapHost;
try {
mapHost = getHost(); // Leads to a wait.
} catch (InterruptedException e) {
if (isShutdown.get()) {
LOG.info(
srcNameTrimmed
+ ": "
+ "Interrupted while waiting for host and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
Thread.currentThread().interrupt();
break;
} else {
throw e;
}
}
if (mapHost == null) {
break; // Check for the exit condition.
}
LOG.debug("{}: Processing pending host: {}", srcNameTrimmed, mapHost);
if (!isShutdown.get()) {
count++;
if (LOG.isDebugEnabled()) {
LOG.debug(
srcNameTrimmed + ": " + "Scheduling fetch for inputHost: {}",
mapHost.getHostIdentifier() + ":" + mapHost.getPartitionId());
}
if (isFirstRssPartitionFetch(mapHost)) {
CelebornTezShuffleDataFetcher celebornTezShuffleDataFetcher =
constructCelebornFetcherForPartition(mapHost);
celebornRunningFetchers.add(celebornTezShuffleDataFetcher);
ListenableFuture<Void> future =
fetcherExecutor.submit(celebornTezShuffleDataFetcher);
Futures.addCallback(
future,
new FetchFutureCallback(celebornTezShuffleDataFetcher),
MoreExecutors.directExecutor());
} else {
for (int i = 0; i < mapHost.getAndClearKnownMaps().size(); i++) {
remainingMaps.decrementAndGet();
}
LOG.info(
"Partition was fetched, remainingMaps desc, now value:{}",
remainingMaps.get());
}
}
}
}
}
}
LOG.info(
"Shutting down FetchScheduler for input: {}, wasInterrupted={}",
srcNameTrimmed,
Thread.currentThread().isInterrupted());
if (!fetcherExecutor.isShutdown()) {
fetcherExecutor.shutdownNow();
}
return null;
}