private TableMgmtStats manageTablets()

in server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java [427:700]


  private TableMgmtStats manageTablets(Iterator<TabletManagement> iter,
      TabletManagementParameters tableMgmtParams,
      SortedMap<TServerInstance,TabletServerStatus> currentTServers, boolean isFullScan)
      throws TException, DistributedStoreException, WalMarkerException, IOException {

    final TableMgmtStats tableMgmtStats = new TableMgmtStats();
    final boolean shuttingDownAllTabletServers =
        tableMgmtParams.getServersToShutdown().equals(currentTServers.keySet());
    if (shuttingDownAllTabletServers && !isFullScan) {
      // If we are shutting down all of the TabletServers, then don't process any events
      // from the EventCoordinator.
      LOG.debug("Partial scan requested, but aborted due to shutdown of all TabletServers");
      return tableMgmtStats;
    }

    int unloaded = 0;

    TabletLists tLists = new TabletLists(currentTServers, tableMgmtParams.getGroupedTServers(),
        tableMgmtParams.getServersToShutdown());

    CompactionJobGenerator compactionGenerator =
        new CompactionJobGenerator(new ServiceEnvironmentImpl(manager.getContext()),
            tableMgmtParams.getCompactionHints(), tableMgmtParams.getSteadyTime());

    try {
      CheckCompactionConfig.validate(manager.getConfiguration(), Level.TRACE);
      this.metrics.clearCompactionServiceConfigurationError();
    } catch (RuntimeException | ReflectiveOperationException e) {
      this.metrics.setCompactionServiceConfigurationError();
      LOG.error(
          "Error validating compaction configuration, all {} compactions are paused until the configuration is fixed.",
          store.getLevel(), e);
      compactionGenerator = null;
    }

    Set<TServerInstance> filteredServersToShutdown =
        new HashSet<>(tableMgmtParams.getServersToShutdown());

    while (iter.hasNext()) {
      final TabletManagement mti = iter.next();
      if (mti == null) {
        throw new IllegalStateException("State store returned a null ManagerTabletInfo object");
      }

      final TabletMetadata tm = mti.getTabletMetadata();

      final String mtiError = mti.getErrorMessage();
      if (mtiError != null) {
        // An error happened on the TabletServer in the TabletManagementIterator
        // when trying to process this extent.
        LOG.warn(
            "Error on TabletServer trying to get Tablet management information for extent: {}. Error message: {}",
            tm.getExtent(), mtiError);
        this.metrics.incrementTabletGroupWatcherError(this.store.getLevel());
        tableMgmtStats.tabletsWithErrors++;
        continue;
      }

      final TableId tableId = tm.getTableId();
      // ignore entries for tables that do not exist in zookeeper
      if (manager.getTableManager().getTableState(tableId) == null) {
        continue;
      }

      // Don't overwhelm the tablet servers with work
      if (tLists.unassigned.size() + unloaded
          > Manager.MAX_TSERVER_WORK_CHUNK * currentTServers.size()
          || tLists.volumeReplacements.size() > 1000) {
        flushChanges(tLists);
        tLists.reset();
        unloaded = 0;
      }

      final TableConfiguration tableConf = manager.getContext().getTableConfiguration(tableId);

      TabletState state = TabletState.compute(tm, currentTServers.keySet());
      if (state == TabletState.ASSIGNED_TO_DEAD_SERVER) {
        /*
         * This code exists to deal with a race condition caused by two threads running in this
         * class that compute tablets actions. One thread does full scans and the other reacts to
         * events and does partial scans. Below is an example of the race condition this is
         * handling.
         *
         * - TGW Thread 1 : reads the set of tablets servers and its empty
         *
         * - TGW Thread 2 : reads the set of tablet servers and its [TS1]
         *
         * - TGW Thread 2 : Sees tabletX without a location and assigns it to TS1
         *
         * - TGW Thread 1 : Sees tabletX assigned to TS1 and assumes it's assigned to a dead tablet
         * server because its set of live servers is the empty set.
         *
         * To deal with this race condition, this code recomputes the tablet state using the latest
         * tservers when a tablet is seen assigned to a dead tserver.
         */

        TabletState newState = TabletState.compute(tm, manager.tserversSnapshot().getTservers());
        if (newState != state) {
          LOG.debug("Tablet state changed when using latest set of tservers {} {} {}",
              tm.getExtent(), state, newState);
          state = newState;
        }
      }
      tableMgmtStats.counts[state.ordinal()]++;

      // This is final because nothing in this method should change the goal. All computation of the
      // goal should be done in TabletGoalState.compute() so that all parts of the Accumulo code
      // will compute a consistent goal.
      final TabletGoalState goal =
          TabletGoalState.compute(tm, state, manager.tabletBalancer, tableMgmtParams);

      final Set<ManagementAction> actions = mti.getActions();

      if (actions.contains(ManagementAction.NEEDS_RECOVERY) && goal != TabletGoalState.HOSTED) {
        LOG.warn("Tablet has wals, but goal is not hosted. Tablet: {}, goal:{}", tm.getExtent(),
            goal);
      }

      if (actions.contains(ManagementAction.NEEDS_VOLUME_REPLACEMENT)) {
        tableMgmtStats.totalVolumeReplacements++;
        if (state == TabletState.UNASSIGNED || state == TabletState.SUSPENDED) {
          var volRep =
              VolumeUtil.computeVolumeReplacements(tableMgmtParams.getVolumeReplacements(), tm);
          if (volRep.logsToRemove.size() + volRep.filesToRemove.size() > 0) {
            if (tm.getLocation() != null) {
              // since the totalVolumeReplacements counter was incremented, should try this again
              // later after its unassigned
              LOG.debug("Volume replacement needed for {} but it has a location {}.",
                  tm.getExtent(), tm.getLocation());
            } else if (tm.getOperationId() != null) {
              LOG.debug("Volume replacement needed for {} but it has an active operation {}.",
                  tm.getExtent(), tm.getOperationId());
            } else {
              LOG.debug("Volume replacement needed for {}.", tm.getExtent());
              // buffer replacements so that multiple mutations can be done at once
              tLists.volumeReplacements.add(volRep);
            }
          } else {
            LOG.debug("Volume replacement evaluation for {} returned no changes.", tm.getExtent());
          }
        } else {
          LOG.debug("Volume replacement needed for {} but its tablet state is {}.", tm.getExtent(),
              state);
        }
      }

      if (actions.contains(ManagementAction.BAD_STATE) && tm.isFutureAndCurrentLocationSet()) {
        Manager.log.error("{}, saw tablet with multiple locations, which should not happen",
            tm.getExtent());
        logIncorrectTabletLocations(tm);
        // take no further action for this tablet
        continue;
      }

      final Location location = tm.getLocation();
      Location current = null;
      Location future = null;
      if (tm.hasCurrent()) {
        current = tm.getLocation();
      } else {
        future = tm.getLocation();
      }
      TabletLogger.missassigned(tm.getExtent(), goal.toString(), state.toString(),
          future != null ? future.getServerInstance() : null,
          current != null ? current.getServerInstance() : null, tm.getLogs().size());

      if (isFullScan) {
        stats.update(tableId, state);
      }

      if (Manager.log.isTraceEnabled()) {
        Manager.log.trace(
            "[{}] Shutting down all Tservers: {}, dependentCount: {} Extent: {}, state: {}, goal: {} actions:{} #wals:{}",
            store.name(), tableMgmtParams.getServersToShutdown().equals(currentTServers.keySet()),
            dependentWatcher == null ? "null" : dependentWatcher.assignedOrHosted(), tm.getExtent(),
            state, goal, actions, tm.getLogs().size());
      }

      final boolean needsSplit = actions.contains(ManagementAction.NEEDS_SPLITTING);
      if (needsSplit) {
        LOG.debug("{} may need splitting.", tm.getExtent());
        manager.getSplitter().initiateSplit(tm.getExtent());
      }

      if (actions.contains(ManagementAction.NEEDS_COMPACTING) && compactionGenerator != null) {
        // Check if tablet needs splitting, priority should be giving to splits over
        // compactions because it's best to compact after a split
        if (!needsSplit) {
          var jobs = compactionGenerator.generateJobs(tm,
              TabletManagementIterator.determineCompactionKinds(actions));
          LOG.debug("{} may need compacting adding {} jobs", tm.getExtent(), jobs.size());
          manager.getCompactionCoordinator().addJobs(tm, jobs);
        } else {
          LOG.trace("skipping compaction job generation because {} may need splitting.",
              tm.getExtent());
        }
      }

      if (actions.contains(ManagementAction.NEEDS_LOCATION_UPDATE)
          || actions.contains(ManagementAction.NEEDS_RECOVERY)) {

        if (tm.getLocation() != null) {
          filteredServersToShutdown.remove(tm.getLocation().getServerInstance());
        }

        if (goal == TabletGoalState.HOSTED) {

          // RecoveryManager.recoverLogs will return false when all of the logs
          // have been sorted so that recovery can occur. Delay the hosting of
          // the Tablet until the sorting is finished.
          if ((state != TabletState.HOSTED && actions.contains(ManagementAction.NEEDS_RECOVERY))
              && manager.recoveryManager.recoverLogs(tm.getExtent(), tm.getLogs())) {
            LOG.debug("Not hosting {} as it needs recovery, logs: {}", tm.getExtent(),
                tm.getLogs().size());
            continue;
          }
          switch (state) {
            case ASSIGNED_TO_DEAD_SERVER:
              hostDeadTablet(tLists, tm, location);
              break;
            case SUSPENDED:
              hostSuspendedTablet(tLists, tm, location, tableConf);
              break;
            case UNASSIGNED:
              hostUnassignedTablet(tLists, tm.getExtent(),
                  new UnassignedTablet(location, tm.getLast()));
              break;
            case ASSIGNED:
              // Send another reminder
              tLists.assigned.add(new Assignment(tm.getExtent(),
                  future != null ? future.getServerInstance() : null, tm.getLast()));
              break;
            case HOSTED:
              break;
          }
        } else {
          switch (state) {
            case SUSPENDED:
              // Request a move to UNASSIGNED, so as to allow balancing to continue.
              tLists.suspendedToGoneServers.add(tm);
              break;
            case ASSIGNED_TO_DEAD_SERVER:
              unassignDeadTablet(tLists, tm);
              break;
            case HOSTED:
              TServerConnection client =
                  manager.tserverSet.getConnection(location.getServerInstance());
              if (client != null) {
                TABLET_UNLOAD_LOGGER.trace("[{}] Requesting TabletServer {} unload {} {}",
                    store.name(), location.getServerInstance(), tm.getExtent(), goal.howUnload());
                client.unloadTablet(manager.managerLock, tm.getExtent(), goal.howUnload(),
                    manager.getSteadyTime().getMillis());
                tableMgmtStats.totalUnloaded++;
                unloaded++;
              } else {
                Manager.log.warn("Could not connect to server {}", location);
              }
              break;
            case ASSIGNED:
            case UNASSIGNED:
              break;
          }
        }
      }
    }

    flushChanges(tLists);

    if (isFullScan) {
      this.filteredServersToShutdown = Set.copyOf(filteredServersToShutdown);
    }

    return tableMgmtStats;
  }