in hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java [2523:2696]
public void shutdown() {
stopMetricsLogger();
if (plugins != null) {
for (ServicePlugin p : plugins) {
try {
p.stop();
LOG.info("Stopped plug-in {}", p);
} catch (Throwable t) {
LOG.warn("ServicePlugin {} could not be stopped", p, t);
}
}
}
List<BPOfferService> bposArray = (this.blockPoolManager == null)
? new ArrayList<BPOfferService>()
: this.blockPoolManager.getAllNamenodeThreads();
// If shutdown is not for restart, set shouldRun to false early.
if (!shutdownForUpgrade) {
shouldRun = false;
}
// When shutting down for restart, DataXceiverServer is interrupted
// in order to avoid any further acceptance of requests, but the peers
// for block writes are not closed until the clients are notified.
if (dataXceiverServer != null) {
try {
xserver.sendOOBToPeers();
((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
this.dataXceiverServer.interrupt();
} catch (Exception e) {
// Ignore, since the out of band messaging is advisory.
LOG.trace("Exception interrupting DataXceiverServer", e);
}
}
// Record the time of initial notification
long timeNotified = Time.monotonicNow();
if (localDataXceiverServer != null) {
((DataXceiverServer) this.localDataXceiverServer.getRunnable()).kill();
this.localDataXceiverServer.interrupt();
}
// Terminate directory scanner and block scanner
shutdownPeriodicScanners();
shutdownDiskBalancer();
// Stop the web server
if (httpServer != null) {
try {
httpServer.close();
} catch (Exception e) {
LOG.warn("Exception shutting down DataNode HttpServer", e);
}
}
volumeChecker.shutdownAndWait(1, TimeUnit.SECONDS);
if (storageLocationChecker != null) {
storageLocationChecker.shutdownAndWait(1, TimeUnit.SECONDS);
}
if (pauseMonitor != null) {
pauseMonitor.stop();
}
// shouldRun is set to false here to prevent certain threads from exiting
// before the restart prep is done.
this.shouldRun = false;
// wait reconfiguration thread, if any, to exit
shutdownReconfigurationTask();
LOG.info("Waiting up to 30 seconds for transfer threads to complete");
HadoopExecutors.shutdown(this.xferService, LOG, 15L, TimeUnit.SECONDS);
// wait for all data receiver threads to exit
if (this.threadGroup != null) {
int sleepMs = 2;
while (true) {
// When shutting down for restart, wait 1 second before forcing
// termination of receiver threads.
if (!this.shutdownForUpgrade ||
(this.shutdownForUpgrade && (Time.monotonicNow() - timeNotified
> 1000))) {
this.threadGroup.interrupt();
break;
}
LOG.info("Waiting for threadgroup to exit, active threads is {}",
this.threadGroup.activeCount());
if (this.threadGroup.activeCount() == 0) {
break;
}
try {
Thread.sleep(sleepMs);
} catch (InterruptedException e) {}
sleepMs = sleepMs * 3 / 2; // exponential backoff
if (sleepMs > 200) {
sleepMs = 200;
}
}
this.threadGroup = null;
}
if (this.dataXceiverServer != null) {
// wait for dataXceiverServer to terminate
try {
this.dataXceiverServer.join();
} catch (InterruptedException ie) {
}
}
if (this.localDataXceiverServer != null) {
// wait for localDataXceiverServer to terminate
try {
this.localDataXceiverServer.join();
} catch (InterruptedException ie) {
}
}
if (metrics != null) {
metrics.setDataNodeActiveXceiversCount(0);
metrics.setDataNodeReadActiveXceiversCount(0);
metrics.setDataNodeWriteActiveXceiversCount(0);
metrics.setDataNodePacketResponderCount(0);
metrics.setDataNodeBlockRecoveryWorkerCount(0);
}
// IPC server needs to be shutdown late in the process, otherwise
// shutdown command response won't get sent.
if (ipcServer != null) {
ipcServer.stop();
}
if (ecWorker != null) {
ecWorker.shutDown();
}
if(blockPoolManager != null) {
try {
this.blockPoolManager.shutDownAll(bposArray);
} catch (InterruptedException ie) {
LOG.warn("Received exception in BlockPoolManager#shutDownAll", ie);
}
}
if (storage != null) {
try {
this.storage.unlockAll();
} catch (IOException ie) {
LOG.warn("Exception when unlocking storage", ie);
}
}
if (data != null) {
data.shutdown();
}
if (metrics != null) {
metrics.shutdown();
}
if (dnConf.diskStatsEnabled && diskMetrics != null) {
diskMetrics.shutdownAndWait();
}
if (dataNodeInfoBeanName != null) {
MBeans.unregister(dataNodeInfoBeanName);
dataNodeInfoBeanName = null;
}
if (shortCircuitRegistry != null) shortCircuitRegistry.shutdown();
LOG.info("Shutdown complete.");
synchronized(this) {
// it is already false, but setting it again to avoid a findbug warning.
this.shouldRun = false;
// Notify the main thread.
notifyAll();
}
tracer.close();
dataSetLockManager.lockLeakCheck();
}