public void shutdown()

in hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java [2523:2696]


  public void shutdown() {
    stopMetricsLogger();
    if (plugins != null) {
      for (ServicePlugin p : plugins) {
        try {
          p.stop();
          LOG.info("Stopped plug-in {}", p);
        } catch (Throwable t) {
          LOG.warn("ServicePlugin {} could not be stopped", p, t);
        }
      }
    }

    List<BPOfferService> bposArray = (this.blockPoolManager == null)
        ? new ArrayList<BPOfferService>()
        : this.blockPoolManager.getAllNamenodeThreads();
    // If shutdown is not for restart, set shouldRun to false early. 
    if (!shutdownForUpgrade) {
      shouldRun = false;
    }

    // When shutting down for restart, DataXceiverServer is interrupted
    // in order to avoid any further acceptance of requests, but the peers
    // for block writes are not closed until the clients are notified.
    if (dataXceiverServer != null) {
      try {
        xserver.sendOOBToPeers();
        ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
        this.dataXceiverServer.interrupt();
      } catch (Exception e) {
        // Ignore, since the out of band messaging is advisory.
        LOG.trace("Exception interrupting DataXceiverServer", e);
      }
    }

    // Record the time of initial notification
    long timeNotified = Time.monotonicNow();

    if (localDataXceiverServer != null) {
      ((DataXceiverServer) this.localDataXceiverServer.getRunnable()).kill();
      this.localDataXceiverServer.interrupt();
    }

    // Terminate directory scanner and block scanner
    shutdownPeriodicScanners();
    shutdownDiskBalancer();

    // Stop the web server
    if (httpServer != null) {
      try {
        httpServer.close();
      } catch (Exception e) {
        LOG.warn("Exception shutting down DataNode HttpServer", e);
      }
    }

    volumeChecker.shutdownAndWait(1, TimeUnit.SECONDS);

    if (storageLocationChecker != null) {
      storageLocationChecker.shutdownAndWait(1, TimeUnit.SECONDS);
    }

    if (pauseMonitor != null) {
      pauseMonitor.stop();
    }

    // shouldRun is set to false here to prevent certain threads from exiting
    // before the restart prep is done.
    this.shouldRun = false;
    
    // wait reconfiguration thread, if any, to exit
    shutdownReconfigurationTask();

    LOG.info("Waiting up to 30 seconds for transfer threads to complete");
    HadoopExecutors.shutdown(this.xferService, LOG, 15L, TimeUnit.SECONDS);

    // wait for all data receiver threads to exit
    if (this.threadGroup != null) {
      int sleepMs = 2;
      while (true) {
        // When shutting down for restart, wait 1 second before forcing
        // termination of receiver threads.
        if (!this.shutdownForUpgrade ||
            (this.shutdownForUpgrade && (Time.monotonicNow() - timeNotified
                > 1000))) {
          this.threadGroup.interrupt();
          break;
        }
        LOG.info("Waiting for threadgroup to exit, active threads is {}",
                 this.threadGroup.activeCount());
        if (this.threadGroup.activeCount() == 0) {
          break;
        }
        try {
          Thread.sleep(sleepMs);
        } catch (InterruptedException e) {}
        sleepMs = sleepMs * 3 / 2; // exponential backoff
        if (sleepMs > 200) {
          sleepMs = 200;
        }
      }
      this.threadGroup = null;
    }
    if (this.dataXceiverServer != null) {
      // wait for dataXceiverServer to terminate
      try {
        this.dataXceiverServer.join();
      } catch (InterruptedException ie) {
      }
    }
    if (this.localDataXceiverServer != null) {
      // wait for localDataXceiverServer to terminate
      try {
        this.localDataXceiverServer.join();
      } catch (InterruptedException ie) {
      }
    }
    if (metrics != null) {
      metrics.setDataNodeActiveXceiversCount(0);
      metrics.setDataNodeReadActiveXceiversCount(0);
      metrics.setDataNodeWriteActiveXceiversCount(0);
      metrics.setDataNodePacketResponderCount(0);
      metrics.setDataNodeBlockRecoveryWorkerCount(0);
    }

   // IPC server needs to be shutdown late in the process, otherwise
   // shutdown command response won't get sent.
   if (ipcServer != null) {
      ipcServer.stop();
    }

    if (ecWorker != null) {
      ecWorker.shutDown();
    }

    if(blockPoolManager != null) {
      try {
        this.blockPoolManager.shutDownAll(bposArray);
      } catch (InterruptedException ie) {
        LOG.warn("Received exception in BlockPoolManager#shutDownAll", ie);
      }
    }
    
    if (storage != null) {
      try {
        this.storage.unlockAll();
      } catch (IOException ie) {
        LOG.warn("Exception when unlocking storage", ie);
      }
    }
    if (data != null) {
      data.shutdown();
    }
    if (metrics != null) {
      metrics.shutdown();
    }
    if (dnConf.diskStatsEnabled && diskMetrics != null) {
      diskMetrics.shutdownAndWait();
    }
    if (dataNodeInfoBeanName != null) {
      MBeans.unregister(dataNodeInfoBeanName);
      dataNodeInfoBeanName = null;
    }
    if (shortCircuitRegistry != null) shortCircuitRegistry.shutdown();
    LOG.info("Shutdown complete.");
    synchronized(this) {
      // it is already false, but setting it again to avoid a findbug warning.
      this.shouldRun = false;
      // Notify the main thread.
      notifyAll();
    }
    tracer.close();
    dataSetLockManager.lockLeakCheck();
  }