public void run()

in server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java [535:771]


  public void run() {
    SecurityUtil.serverLogin(getConfiguration());

    if (authKeyWatcher != null) {
      log.info("Seeding ZooKeeper watcher for authentication keys");
      try {
        authKeyWatcher.updateAuthKeys();
      } catch (KeeperException | InterruptedException e) {
        // TODO Does there need to be a better check? What are the error conditions that we'd fall
        // out here? AUTH_FAILURE?
        // If we get the error, do we just put it on a timer and retry the exists(String, Watcher)
        // call?
        log.error("Failed to perform initial check for authentication tokens in"
            + " ZooKeeper. Delegation token authentication will be unavailable.", e);
      }
    }
    try {
      clientAddress = startTabletClientService();
    } catch (UnknownHostException e1) {
      throw new RuntimeException("Failed to start the tablet client service", e1);
    }

    MetricsInfo metricsInfo = context.getMetricsInfo();

    metrics = new TabletServerMetrics(this);
    updateMetrics = new TabletServerUpdateMetrics();
    scanMetrics = new TabletServerScanMetrics(this.resourceManager::getOpenFiles);
    sessionManager.setZombieCountConsumer(scanMetrics::setZombieScanThreads);
    mincMetrics = new TabletServerMinCMetrics();
    pausedMetrics = new PausedCompactionMetrics();
    blockCacheMetrics = new BlockCacheMetrics(this.resourceManager.getIndexCache(),
        this.resourceManager.getDataCache(), this.resourceManager.getSummaryCache());

    metricsInfo.addMetricsProducers(this, metrics, updateMetrics, scanMetrics, mincMetrics,
        pausedMetrics, blockCacheMetrics);
    metricsInfo.init(MetricsInfo.serviceTags(context.getInstanceName(), getApplicationName(),
        clientAddress, getResourceGroup()));

    announceExistence();
    getContext().setServiceLock(tabletServerLock);

    try {
      walMarker.initWalMarker(getTabletSession());
    } catch (Exception e) {
      log.error("Unable to create WAL marker node in zookeeper", e);
      throw new RuntimeException(e);
    }

    int threadPoolSize =
        getContext().getConfiguration().getCount(Property.TSERV_WAL_SORT_MAX_CONCURRENT);
    if (threadPoolSize > 0) {
      try {
        // Attempt to process all existing log sorting work and start a background
        // thread to look for log sorting work in the future
        logSorter.startWatchingForRecoveryLogs(threadPoolSize);
      } catch (Exception ex) {
        log.error("Error starting LogSorter");
        throw new RuntimeException(ex);
      }
    } else {
      log.info(
          "Log sorting for tablet recovery is disabled, TSERV_WAL_SORT_MAX_CONCURRENT is less than 1.");
    }

    final AccumuloConfiguration aconf = getConfiguration();

    final long onDemandUnloaderInterval =
        aconf.getTimeInMillis(Property.TSERV_ONDEMAND_UNLOADER_INTERVAL);
    watchCriticalFixedDelay(aconf, onDemandUnloaderInterval, () -> {
      evaluateOnDemandTabletsForUnload();
    });

    HostAndPort managerHost;
    while (!isShutdownRequested()) {
      if (Thread.currentThread().isInterrupted()) {
        log.info("Server process thread has been interrupted, shutting down");
        break;
      }

      updateIdleStatus(getOnlineTablets().isEmpty());

      // send all of the pending messages
      try {
        ManagerMessage mm = null;
        ManagerClientService.Client iface = null;

        try {
          // wait until a message is ready to send, or a server stop
          // was requested
          while (mm == null && !isShutdownRequested() && !Thread.currentThread().isInterrupted()) {
            mm = managerMessages.poll(1, TimeUnit.SECONDS);
            updateIdleStatus(getOnlineTablets().isEmpty());
          }

          // have a message to send to the manager, so grab a
          // connection
          managerHost = getManagerAddress();
          iface = managerConnection(managerHost);
          TServiceClient client = iface;

          // if while loop does not execute at all and mm != null,
          // then finally block should place mm back on queue
          while (!Thread.currentThread().isInterrupted() && !isShutdownRequested() && mm != null
              && client != null && client.getOutputProtocol() != null
              && client.getOutputProtocol().getTransport() != null
              && client.getOutputProtocol().getTransport().isOpen()) {
            try {
              mm.send(getContext().rpcCreds(), getClientAddressString(), iface);
              mm = null;
            } catch (TException ex) {
              log.warn("Error sending message: queuing message again");
              managerMessages.putFirst(mm);
              mm = null;
              throw ex;
            }

            // if any messages are immediately available grab em and
            // send them
            mm = managerMessages.poll();
            updateIdleStatus(getOnlineTablets().isEmpty());
          }

        } finally {

          if (mm != null) {
            managerMessages.putFirst(mm);
          }
          returnManagerConnection(iface);

          sleepUninterruptibly(1, TimeUnit.SECONDS);
        }
      } catch (InterruptedException e) {
        log.info("Interrupt Exception received, shutting down");
        gracefulShutdown(getContext().rpcCreds());
      } catch (Exception e) {
        // may have lost connection with manager
        // loop back to the beginning and wait for a new one
        // this way we survive manager failures
        log.error(getClientAddressString() + ": TServerInfo: Exception. Manager down?", e);
      }
    }

    // Tell the Manager we are shutting down so that it doesn't try
    // to assign tablets.
    ManagerClientService.Client iface = managerConnection(getManagerAddress());
    try {
      iface.tabletServerStopping(TraceUtil.traceInfo(), getContext().rpcCreds(),
          getClientAddressString());
    } catch (TException e) {
      log.error("Error informing Manager that we are shutting down, halting server", e);
      Halt.halt("Error informing Manager that we are shutting down, exiting!", -1);
    } finally {
      returnManagerConnection(iface);
    }

    // Best-effort attempt at unloading tablets.
    log.debug("Unloading tablets");
    final List<Future<?>> futures = new ArrayList<>();
    final ThreadPoolExecutor tpe = getContext().threadPools()
        .getPoolBuilder(ThreadPoolNames.TSERVER_SHUTDOWN_UNLOAD_TABLET_POOL).numCoreThreads(8)
        .numMaxThreads(16).build();

    iface = managerConnection(getManagerAddress());
    boolean managerDown = false;

    try {
      for (DataLevel level : new DataLevel[] {DataLevel.USER, DataLevel.METADATA, DataLevel.ROOT}) {
        getOnlineTablets().keySet().forEach(ke -> {
          if (DataLevel.of(ke.tableId()) == level) {
            futures.add(tpe.submit(new UnloadTabletHandler(this, ke, TUnloadTabletGoal.UNASSIGNED,
                SteadyTime.from(System.currentTimeMillis(), TimeUnit.MILLISECONDS))));
          }
        });
        while (!futures.isEmpty()) {
          Iterator<Future<?>> unloads = futures.iterator();
          while (unloads.hasNext()) {
            Future<?> f = unloads.next();
            if (f.isDone()) {
              if (!managerDown) {
                ManagerMessage mm = managerMessages.poll();
                try {
                  mm.send(getContext().rpcCreds(), getClientAddressString(), iface);
                } catch (TException e) {
                  managerDown = true;
                  log.debug("Error sending message to Manager during tablet unloading, msg: {}",
                      e.getMessage());
                }
              }
              unloads.remove();
            }
          }
          log.debug("Waiting on {} {} tablets to close.", futures.size(), level);
          UtilWaitThread.sleep(1000);
        }
        log.debug("All {} tablets unloaded", level);
      }
    } finally {
      if (!managerDown) {
        try {
          ManagerMessage mm = managerMessages.poll();
          do {
            if (mm != null) {
              mm.send(getContext().rpcCreds(), getClientAddressString(), iface);
            }
            mm = managerMessages.poll();
          } while (mm != null);
        } catch (TException e) {
          log.debug("Error sending message to Manager during tablet unloading, msg: {}",
              e.getMessage());
        }
      }
      returnManagerConnection(iface);
      tpe.shutdown();
    }

    log.debug("Stopping Thrift Servers");
    if (server != null) {
      server.stop();
    }

    try {
      log.debug("Closing filesystems");
      getVolumeManager().close();
    } catch (IOException e) {
      log.warn("Failed to close filesystem : {}", e.getMessage(), e);
    }

    context.getLowMemoryDetector().logGCInfo(getConfiguration());

    getShutdownComplete().set(true);
    log.info("TServerInfo: stop requested. exiting ... ");
    try {
      tabletServerLock.unlock();
    } catch (Exception e) {
      log.warn("Failed to release tablet server lock", e);
    }
  }