public void run()

in server/manager/src/main/java/org/apache/accumulo/manager/Manager.java [1105:1460]


  public void run() {
    final ServerContext context = getContext();

    // ACCUMULO-4424 Put up the Thrift servers before getting the lock as a sign of process health
    // when a hot-standby
    //
    // Start the Manager's Fate Service
    fateServiceHandler = new FateServiceHandler(this);
    managerClientHandler = new ManagerClientServiceHandler(this);
    compactionCoordinator = new CompactionCoordinator(this, fateRefs);

    ServerAddress sa;
    var processor = ThriftProcessorTypes.getManagerTProcessor(this, fateServiceHandler,
        compactionCoordinator.getThriftService(), managerClientHandler, getContext());

    try {
      sa = TServerUtils.createThriftServer(context, getHostname(), Property.MANAGER_CLIENTPORT,
          processor, "Manager", null, Property.MANAGER_MINTHREADS,
          Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK);
    } catch (UnknownHostException e) {
      throw new IllegalStateException("Unable to start server on host " + getHostname(), e);
    }

    // block until we can obtain the ZK lock for the manager. Create the
    // initial lock using ThriftService.NONE. This will allow the lock
    // allocation to occur, but prevent any services from getting the
    // Manager address for the COORDINATOR, FATE, and MANAGER services.
    // The lock data is replaced below and the manager address is exposed
    // for each of these services.
    ServiceLockData sld;
    try {
      sld = getManagerLock(context.getServerPaths().createManagerPath());
    } catch (KeeperException | InterruptedException e) {
      throw new IllegalStateException("Exception getting manager lock", e);
    }
    // Setting the Manager state to HAVE_LOCK has the side-effect of
    // starting the upgrade process if necessary.
    setManagerState(ManagerState.HAVE_LOCK);

    // Set the HostName **after** initially creating the lock. The lock data is
    // updated below with the correct address. This prevents clients from accessing
    // the Manager until all of the internal processes are started.
    setHostname(sa.address);

    recoveryManager = new RecoveryManager(this, timeToCacheRecoveryWalExistence);

    context.getZooCache().addZooCacheWatcher(new TableStateWatcher((tableId, event) -> {
      TableState state = getTableManager().getTableState(tableId);
      log.debug("Table state transition to {} @ {}", state, event);
      nextEvent.event(tableId, "Table state in zookeeper changed for %s to %s", tableId, state);
    }));

    tableInformationStatusPool = ThreadPools.getServerThreadPools()
        .createExecutorService(getConfiguration(), Property.MANAGER_STATUS_THREAD_POOL_SIZE, false);

    tabletRefreshThreadPool = ThreadPools.getServerThreadPools().getPoolBuilder("Tablet refresh ")
        .numCoreThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MINTHREADS))
        .numMaxThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MAXTHREADS))
        .build();

    Thread statusThread = Threads.createThread("Status Thread", new StatusThread());
    statusThread.start();

    tserverSet.startListeningForTabletServerChanges();
    try {
      blockForTservers();
    } catch (InterruptedException ex) {
      Thread.currentThread().interrupt();
    }

    ZooReaderWriter zReaderWriter = context.getZooSession().asReaderWriter();
    try {
      zReaderWriter.getChildren(Constants.ZRECOVERY, new Watcher() {
        @Override
        public void process(WatchedEvent event) {
          nextEvent.event("Noticed recovery changes %s", event.getType());
          try {
            // watcher only fires once, add it back
            zReaderWriter.getChildren(Constants.ZRECOVERY, this);
          } catch (Exception e) {
            log.error("Failed to add log recovery watcher back", e);
          }
        }
      });
    } catch (KeeperException | InterruptedException e) {
      throw new IllegalStateException("Unable to read " + Constants.ZRECOVERY, e);
    }

    MetricsInfo metricsInfo = getContext().getMetricsInfo();
    ManagerMetrics managerMetrics = new ManagerMetrics(getConfiguration(), this);
    var producers = managerMetrics.getProducers(getConfiguration(), this);
    producers.add(balancerMetrics);

    final TabletGroupWatcher userTableTGW =
        new TabletGroupWatcher(this, this.userTabletStore, null, managerMetrics) {
          @Override
          boolean canSuspendTablets() {
            // Always allow user data tablets to enter suspended state.
            return true;
          }
        };
    watchers.add(userTableTGW);

    final TabletGroupWatcher metadataTableTGW =
        new TabletGroupWatcher(this, this.metadataTabletStore, watchers.get(0), managerMetrics) {
          @Override
          boolean canSuspendTablets() {
            // Allow metadata tablets to enter suspended state only if so configured. Generally
            // we'll want metadata tablets to
            // be immediately reassigned, even if there's a global table.suspension.duration
            // setting.
            return getConfiguration().getBoolean(Property.MANAGER_METADATA_SUSPENDABLE);
          }
        };
    watchers.add(metadataTableTGW);

    final TabletGroupWatcher rootTableTGW =
        new TabletGroupWatcher(this, this.rootTabletStore, watchers.get(1), managerMetrics) {
          @Override
          boolean canSuspendTablets() {
            // Never allow root tablet to enter suspended state.
            return false;
          }
        };
    watchers.add(rootTableTGW);

    while (isUpgrading()) {
      UpgradeStatus currentStatus = upgradeCoordinator.getStatus();
      if (currentStatus == UpgradeStatus.FAILED || currentStatus == UpgradeStatus.COMPLETE) {
        break;
      }
      switch (currentStatus) {
        case UPGRADED_METADATA:
          if (rootTableTGW.getState() == NEW) {
            rootTableTGW.start();
          }
          if (metadataTableTGW.getState() == NEW) {
            metadataTableTGW.start();
          }
          if (userTableTGW.getState() == NEW) {
            userTableTGW.start();
          }
          break;
        case UPGRADED_ROOT:
          if (rootTableTGW.getState() == NEW) {
            rootTableTGW.start();
          }
          if (metadataTableTGW.getState() == NEW) {
            metadataTableTGW.start();
          }
          break;
        case UPGRADED_ZOOKEEPER:
          // Start processing the root table
          if (rootTableTGW.getState() == NEW) {
            rootTableTGW.start();
          }
          break;
        case FAILED:
        case COMPLETE:
        case INITIAL:
        default:
          break;
      }
      try {
        log.debug("Manager main thread is waiting for upgrade to complete");
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        throw new IllegalStateException("Interrupted while waiting for upgrade to complete", e);
      }
    }

    // In the case where an upgrade is not needed, then we may not
    // have stepped through all of the steps in the previous code
    // block. Make sure all TGWs are started.
    if (upgradeCoordinator.getStatus() != UpgradeStatus.FAILED) {
      if (rootTableTGW.getState() == NEW) {
        rootTableTGW.start();
      }
      if (metadataTableTGW.getState() == NEW) {
        metadataTableTGW.start();
      }
      if (userTableTGW.getState() == NEW) {
        userTableTGW.start();
      }
    }

    // Once we are sure the upgrade is complete, we can safely allow fate use.
    try {
      // wait for metadata upgrade running in background to complete
      if (upgradeMetadataFuture != null) {
        upgradeMetadataFuture.get();
      }
    } catch (ExecutionException | InterruptedException e) {
      throw new IllegalStateException("Metadata upgrade failed", e);
    }

    // Everything should be fully upgraded by this point, but check before starting fate
    // and other processes that depend on the metadata table being available and any
    // other tables that may have been created during the upgrade to exist.
    if (isUpgrading()) {
      throw new IllegalStateException("Upgrade coordinator is unexpectedly not complete");
    }

    metricsInfo.addMetricsProducers(producers.toArray(new MetricsProducer[0]));
    metricsInfo.init(MetricsInfo.serviceTags(getContext().getInstanceName(), getApplicationName(),
        sa.getAddress(), getResourceGroup()));

    Threads.createThread("Migration Cleanup Thread", new MigrationCleanupThread()).start();
    Threads.createThread("ScanServer Cleanup Thread", new ScanServerZKCleaner()).start();

    // Don't call start the CompactionCoordinator until we have tservers and upgrade is complete.
    compactionCoordinator.start();

    this.splitter = new Splitter(this);
    this.splitter.start();

    try {
      Predicate<ZooUtil.LockID> isLockHeld =
          lock -> ServiceLock.isLockHeld(context.getZooCache(), lock);
      var metaInstance = initializeFateInstance(context,
          new MetaFateStore<>(context.getZooSession(), managerLock.getLockID(), isLockHeld));
      var userInstance = initializeFateInstance(context, new UserFateStore<>(context,
          SystemTables.FATE.tableName(), managerLock.getLockID(), isLockHeld));

      if (!fateRefs.compareAndSet(null,
          Map.of(FateInstanceType.META, metaInstance, FateInstanceType.USER, userInstance))) {
        throw new IllegalStateException(
            "Unexpected previous fate reference map already initialized");
      }
      fateReadyLatch.countDown();
    } catch (KeeperException | InterruptedException e) {
      throw new IllegalStateException("Exception setting up FaTE cleanup thread", e);
    }

    ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
        .scheduleWithFixedDelay(() -> ScanServerMetadataEntries.clean(context), 10, 10, MINUTES));

    var tabletMergeabilityInterval =
        getConfiguration().getDuration(Property.MANAGER_TABLET_MERGEABILITY_INTERVAL);
    ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(
        new FindMergeableRangeTask(this), tabletMergeabilityInterval.toMillis(),
        tabletMergeabilityInterval.toMillis(), MILLISECONDS));

    // Make sure that we have a secret key (either a new one or an old one from ZK) before we start
    // the manager client service.
    Thread authenticationTokenKeyManagerThread = null;
    if (authenticationTokenKeyManager != null && keyDistributor != null) {
      log.info("Starting delegation-token key manager");
      try {
        keyDistributor.initialize();
      } catch (KeeperException | InterruptedException e) {
        throw new IllegalStateException("Exception setting up delegation-token key manager", e);
      }
      authenticationTokenKeyManagerThread =
          Threads.createThread("Delegation Token Key Manager", authenticationTokenKeyManager);
      authenticationTokenKeyManagerThread.start();
      boolean logged = false;
      while (!authenticationTokenKeyManager.isInitialized()) {
        // Print out a status message when we start waiting for the key manager to get initialized
        if (!logged) {
          log.info("Waiting for AuthenticationTokenKeyManager to be initialized");
          logged = true;
        }
        sleepUninterruptibly(200, MILLISECONDS);
      }
      // And log when we are initialized
      log.info("AuthenticationTokenSecretManager is initialized");
    }

    // Now that the Manager is up, start the ThriftServer
    sa.startThriftServer("Manager Client Service Handler");
    clientService = sa.server;
    log.info("Started Manager client service at {}", sa.address);

    // Replace the ServiceLockData information in the Manager lock node in ZooKeeper.
    // This advertises the address that clients can use to connect to the Manager
    // for the Coordinator, Fate, and Manager services. Do **not** do this until
    // after the upgrade process is finished and the dependent services are started.
    UUID uuid = sld.getServerUUID(ThriftService.NONE);
    ServiceDescriptors descriptors = new ServiceDescriptors();
    for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER, ThriftService.COORDINATOR,
        ThriftService.FATE}) {
      descriptors
          .addService(new ServiceDescriptor(uuid, svc, getHostname(), this.getResourceGroup()));
    }

    sld = new ServiceLockData(descriptors);
    log.info("Setting manager lock data to {}", sld);
    try {
      managerLock.replaceLockData(sld);
    } catch (KeeperException | InterruptedException e) {
      throw new IllegalStateException("Exception updating manager lock", e);
    }

    while (!isShutdownRequested() && clientService.isServing()) {
      if (Thread.currentThread().isInterrupted()) {
        log.info("Server process thread has been interrupted, shutting down");
        break;
      }
      try {
        Thread.sleep(500);
      } catch (InterruptedException e) {
        log.info("Interrupt Exception received, shutting down");
        gracefulShutdown(context.rpcCreds());
      }
    }

    log.debug("Shutting down fate.");
    getFateRefs().keySet().forEach(type -> fate(type).shutdown(0, MINUTES));

    splitter.stop();

    log.debug("Stopping Thrift Servers");
    sa.server.stop();

    final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME;
    try {
      statusThread.join(remaining(deadline));
    } catch (InterruptedException e) {
      throw new IllegalStateException("Exception stopping status thread", e);
    }

    tableInformationStatusPool.shutdownNow();
    tabletRefreshThreadPool.shutdownNow();

    compactionCoordinator.shutdown();

    // Signal that we want it to stop, and wait for it to do so.
    if (authenticationTokenKeyManager != null) {
      authenticationTokenKeyManager.gracefulStop();
      try {
        if (null != authenticationTokenKeyManagerThread) {
          authenticationTokenKeyManagerThread.join(remaining(deadline));
        }
      } catch (InterruptedException e) {
        throw new IllegalStateException("Exception waiting on delegation-token key manager", e);
      }
    }

    // quit, even if the tablet servers somehow jam up and the watchers
    // don't stop
    for (TabletGroupWatcher watcher : watchers) {
      try {
        watcher.join(remaining(deadline));
      } catch (InterruptedException e) {
        throw new IllegalStateException("Exception waiting on watcher", e);
      }
    }
    getShutdownComplete().set(true);
    log.info("stop requested. exiting ... ");
    try {
      managerLock.unlock();
    } catch (Exception e) {
      log.warn("Failed to release Manager lock", e);
    }
  }