in client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java [1195:1234]
public void shutdown() throws InterruptedException {
if (Thread.currentThread().isInterrupted()) {
// need to cleanup all FetchedInput (DiskFetchedInput, LocalDisFetchedInput), lockFile
// As of now relying on job cleanup (when all directories would be cleared)
LOG.info(srcNameTrimmed + ": " + "Thread interrupted. Need to cleanup the local dirs");
}
if (!isShutdown.getAndSet(true)) {
// Shut down any pending fetchers
LOG.info(
"Shutting down pending fetchers on source"
+ srcNameTrimmed
+ ": "
+ rssRunningFetchers.size());
lock.lock();
try {
wakeLoop.signal(); // signal the fetch-scheduler
for (RssTezFetcherTask fetcher : rssRunningFetchers) {
try {
fetcher.shutdown(); // This could be parallelized.
} catch (Exception e) {
LOG.warn(
"Error while stopping fetcher during shutdown. Ignoring and continuing. Message={}",
e.getMessage());
}
}
} finally {
lock.unlock();
}
if (this.schedulerExecutor != null && !this.schedulerExecutor.isShutdown()) {
this.schedulerExecutor.shutdownNow();
}
if (this.reporterExecutor != null && !this.reporterExecutor.isShutdown()) {
this.reporterExecutor.shutdownNow();
}
if (this.fetcherExecutor != null && !this.fetcherExecutor.isShutdown()) {
this.fetcherExecutor.shutdownNow(); // Interrupts all running fetchers.
}
}
}