in client-tez/tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/CelebornShuffleManager.java [182:279]
protected Void callInternal() throws Exception {
while (!isShutdown.get() && !isAllInputFetched()) {
lock.lock();
try {
while (((celebornRunningFetchers.size() >= numFetchers || pendingPartition.isEmpty())
&& !isAllInputFetched())
|| !isAllInputAdded()) {
inputContext.notifyProgress();
boolean ret = wakeLoop.await(1000, TimeUnit.MILLISECONDS);
if (isShutdown.get()) {
break;
}
}
} finally {
lock.unlock();
}
if (shuffleError != null) {
// InputContext has already been informed of a fatal error. Relying on
// tez to kill the task.
break;
}
LOG.debug("{}: NumCompletedInputs: {}", sourceDestNameTrimmed, numCompletedInputs);
if (!isAllInputFetched() && !isShutdown.get()) {
lock.lock();
try {
int maxFetchersToRun = numFetchers - celebornRunningFetchers.size();
int count = 0;
while (pendingPartition.peek() != null && !isShutdown.get()) {
Integer partition = null;
try {
partition = pendingPartition.take();
} catch (InterruptedException e) {
if (isShutdown.get()) {
LOG.info(
sourceDestNameTrimmed
+ ": "
+ "Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler Loop");
Thread.currentThread().interrupt();
break;
} else {
throw e;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(
sourceDestNameTrimmed + ": " + "Processing pending partition: " + partition);
}
if (!isShutdown.get()
&& (!successRssPartitionSet.contains(partition)
&& !runningRssPartitionMap.contains(partition))) {
runningRssPartitionMap.add(partition);
CelebornFetcher fetcher = constructFetcherForCeleborn(partition);
celebornRunningFetchers.add(fetcher);
if (isShutdown.get()) {
LOG.info(
sourceDestNameTrimmed
+ ": "
+ "hasBeenShutdown,"
+ "Breaking out of ShuffleScheduler Loop");
break;
}
ListenableFuture<FetchResult> future = fetcherExecutor.submit(fetcher);
Futures.addCallback(
future, new FetchFutureCallback(fetcher), GuavaShim.directExecutor());
if (++count >= maxFetchersToRun) {
break;
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(
sourceDestNameTrimmed
+ ": "
+ "Skipping partition: "
+ partition
+ " since is shutdown");
}
}
}
} finally {
lock.unlock();
}
}
}
shufflePhaseTime.setValue(System.currentTimeMillis() - startTime);
LOG.info(
sourceDestNameTrimmed
+ ": "
+ "Shutting down FetchScheduler, Was Interrupted: "
+ Thread.currentThread().isInterrupted());
if (!fetcherExecutor.isShutdown()) {
fetcherExecutor.shutdownNow();
}
return null;
}