in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java [1653:1773]
protected Void callInternal()
throws IOException, InterruptedException, TezException, RssException {
while (!isShutdown.get() && !isAllInputFetched()) {
LOG.info("Now allEventsReceived: " + allEventsReceived());
synchronized (RssShuffleScheduler.this) {
while (!allEventsReceived()
|| ((rssRunningFetchers.size() >= numFetchers || pendingHosts.isEmpty())
&& !isAllInputFetched())) {
try {
LOG.info(
"RssShuffleSchedulerCallable, wait pending hosts, pendingHosts:{}.",
pendingHosts.isEmpty());
waitAndNotifyProgress();
} catch (InterruptedException e) {
if (isShutdown.get()) {
LOG.info(
srcNameTrimmed
+ ": "
+ "Interrupted while waiting for fetchers to complete"
+ "and hasBeenShutdown. Breaking out of ShuffleSchedulerCallable loop");
Thread.currentThread().interrupt();
break;
} else {
throw e;
}
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(
srcNameTrimmed + ": " + "NumCompletedInputs: {}" + (numInputs - remainingMaps.get()));
}
// Ensure there's memory available before scheduling the next Fetcher.
try {
// If merge is on, block
mergeManager.waitForInMemoryMerge();
// In case usedMemory > memorylimit, wait until some memory is released
mergeManager.waitForShuffleToMergeMemory();
} catch (InterruptedException e) {
if (isShutdown.get()) {
LOG.info(
srcNameTrimmed
+ ": Interrupted while waiting for merge to complete and hasBeenShutdown. "
+ "Breaking out of ShuffleSchedulerCallable loop");
Thread.currentThread().interrupt();
break;
} else {
throw e;
}
}
if (!isShutdown.get() && !isAllInputFetched()) {
synchronized (RssShuffleScheduler.this) {
int numFetchersToRun = numFetchers - rssRunningFetchers.size();
int count = 0;
while (count < numFetchersToRun && !isShutdown.get() && !isAllInputFetched()) {
MapHost mapHost;
try {
mapHost = getHost(); // Leads to a wait.
} catch (InterruptedException e) {
if (isShutdown.get()) {
LOG.info(
srcNameTrimmed
+ ": Interrupted while waiting for host and hasBeenShutdown. "
+ "Breaking out of ShuffleSchedulerCallable loop");
Thread.currentThread().interrupt();
break;
} else {
throw e;
}
}
if (mapHost == null) {
LOG.info("Get null mapHost and break out.");
break; // Check for the exit condition.
}
if (LOG.isDebugEnabled()) {
LOG.debug(srcNameTrimmed + ": " + "Processing pending host: " + mapHost.toString());
}
if (!isShutdown.get()) {
count++;
if (LOG.isDebugEnabled()) {
LOG.debug(
srcNameTrimmed + ": " + "Scheduling fetch for inputHost: {}",
mapHost.getHostIdentifier() + ":" + mapHost.getPartitionId());
}
if (isFirstRssPartitionFetch(mapHost)) {
int partitionId = mapHost.getPartitionId();
RssTezShuffleDataFetcher rssTezShuffleDataFetcher =
constructRssFetcherForPartition(mapHost, partitionToServers.get(partitionId));
rssRunningFetchers.add(rssTezShuffleDataFetcher);
ListenableFuture<Void> future = fetcherExecutor.submit(rssTezShuffleDataFetcher);
Futures.addCallback(
future,
new FetchFutureCallback(rssTezShuffleDataFetcher),
MoreExecutors.directExecutor());
} else {
for (int i = 0; i < mapHost.getAndClearKnownMaps().size(); i++) {
remainingMaps.decrementAndGet();
}
LOG.info(
"Partition was fetched, remainingMaps desc, now value:{}",
remainingMaps.get());
}
}
}
}
}
}
LOG.info(
"Shutting down FetchScheduler for input: {}, wasInterrupted={}",
srcNameTrimmed,
Thread.currentThread().isInterrupted());
if (!fetcherExecutor.isShutdown()) {
fetcherExecutor.shutdownNow();
}
return null;
}