in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java [500:650]
protected Void callInternal() throws Exception {
while (!isShutdown.get() && !isAllInputFetched()) {
lock.lock();
try {
LOG.info(
"numFetchers:{}, shuffleInfoEventsMap.size:{}, numInputs:{}.",
numFetchers,
shuffleInfoEventsMap.size(),
numInputs);
while (((rssRunningFetchers.size() >= numFetchers || pendingPartition.isEmpty())
&& !isAllInputFetched())
|| !isAllInputAdded()) {
LOG.info(
"isAllInputAdded:{}, rssRunningFetchers:{}, numFetchers:{}, pendingPartition:{}, "
+ "successRssPartitionSet:{}, allRssPartition:{} ",
isAllInputAdded(),
rssRunningFetchers,
numFetchers,
pendingPartition,
successRssPartitionSet,
allRssPartition);
inputContext.notifyProgress();
boolean isSignal = wakeLoop.await(1000, TimeUnit.MILLISECONDS);
if (isSignal) {
LOG.info("wakeLoop is signal");
}
if (isShutdown.get()) {
LOG.info("is shut down and break");
break;
}
}
LOG.info(
"run out of while, is all inputadded:{}, fetched:{}",
isAllInputAdded(),
isAllInputFetched());
} finally {
lock.unlock();
}
if (shuffleError != null) {
LOG.warn("Shuffle error.", shuffleError);
// InputContext has already been informed of a fatal error. Relying on
// tez to kill the task.
break;
}
if (LOG.isDebugEnabled()) {
LOG.debug(srcNameTrimmed + ": " + "NumCompletedInputs: " + numCompletedInputs);
}
if (!isAllInputFetched() && !isShutdown.get()) {
lock.lock();
try {
LOG.info(
"numFetchers:{},runningFetchers.size():{}.",
numFetchers,
rssRunningFetchers.size());
int maxFetchersToRun = numFetchers - rssRunningFetchers.size();
int count = 0;
LOG.info("pendingPartition:{}", pendingPartition.peek());
while (pendingPartition.peek() != null && !isShutdown.get()) {
Integer partition = null;
try {
partition = pendingPartition.take();
} catch (InterruptedException e) {
if (isShutdown.get()) {
LOG.info(
srcNameTrimmed
+ ": "
+ "Interrupted and hasBeenShutdown, Breaking out of ShuffleScheduler");
Thread.currentThread().interrupt();
break;
} else {
throw e;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(srcNameTrimmed + ": " + "Processing pending partition: " + partition);
}
if (!isShutdown.get()
&& (!successRssPartitionSet.contains(partition)
&& !runningRssPartitionMap.contains(partition))) {
runningRssPartitionMap.add(partition);
LOG.info(
"generate RssTezFetcherTask, partition:{}, rssWoker:{}, all woker:{}",
partition,
partitionToServers.get(partition),
partitionToServers);
RssTezFetcherTask fetcher =
new RssTezFetcherTask(
RssShuffleManager.this,
inputContext,
conf,
inputManager,
partition,
shuffleId,
applicationAttemptId,
partitionToInput.get(partition),
new HashSet<ShuffleServerInfo>(partitionToServers.get(partition)),
rssAllBlockIdBitmapMap,
rssSuccessBlockIdBitmapMap,
numInputs,
partitionToServers.size());
rssRunningFetchers.add(fetcher);
if (isShutdown.get()) {
LOG.info(
srcNameTrimmed
+ ": "
+ "hasBeenShutdown,"
+ "Breaking out of ShuffleScheduler Loop");
break;
}
ListenableFuture<FetchResult> future =
fetcherExecutor.submit(fetcher); // add fetcher task
Futures.addCallback(
future, new FetchFutureCallback(fetcher), MoreExecutors.directExecutor());
if (++count >= maxFetchersToRun) {
break;
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(
srcNameTrimmed
+ ": "
+ "Skipping partition: "
+ partition
+ " since is shutdown");
}
}
}
} finally {
lock.unlock();
}
}
}
LOG.info("RssShuffleManager numInputs:{}", numInputs);
shufflePhaseTime.setValue(System.currentTimeMillis() - startTime);
LOG.info(
srcNameTrimmed
+ ": "
+ "Shutting down FetchScheduler, Was Interrupted: "
+ Thread.currentThread().isInterrupted());
if (!fetcherExecutor.isShutdown()) {
fetcherExecutor.shutdownNow();
}
return null;
}