in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java [589:654]
public void close() {
try {
if (!isShutdown.getAndSet(true)) {
try {
logProgress();
} catch (Exception e) {
LOG.warn(
"Failed log progress while closing, ignoring and continuing shutdown. Message={}",
e.getMessage());
}
// Notify and interrupt the waiting scheduler thread
synchronized (this) {
notifyAll();
}
// Interrupt the ShuffleScheduler thread only if the close is invoked by another thread.
// If this is invoked on the same thread, then the shuffleRunner has already complete, and
// there's
// no point interrupting it.
// The interrupt is needed to unblock any merges or waits which may be happening, so that
// the thread can
// exit.
if (shuffleSchedulerThread != null
&& !Thread.currentThread().equals(shuffleSchedulerThread)) {
shuffleSchedulerThread.interrupt();
}
// Interrupt the fetchers.
for (RssTezShuffleDataFetcher fetcher : rssRunningFetchers) {
try {
fetcher.shutDown();
} catch (Exception e) {
LOG.warn(
"Error while shutting down fetcher. Ignoring and continuing shutdown. Message={}",
e.getMessage());
}
}
// Kill the Referee thread.
try {
referee.interrupt();
referee.join();
} catch (InterruptedException e) {
LOG.warn("Interrupted while shutting down referee. Ignoring and continuing shutdown");
Thread.currentThread().interrupt();
} catch (Exception e) {
LOG.warn(
"Error while shutting down referee. Ignoring and continuing shutdown. Message={}",
e.getMessage());
}
}
} finally {
long startTime = System.currentTimeMillis();
if (!fetcherExecutor.isShutdown()) {
// Ensure that fetchers respond to cancel request.
fetcherExecutor.shutdownNow();
}
long endTime = System.currentTimeMillis();
LOG.info(
"Shutting down fetchers for input: {}, shutdown timetaken: {} ms, "
+ "hasFetcherExecutorStopped: {}",
srcNameTrimmed,
(endTime - startTime),
hasFetcherExecutorStopped());
}
}