in server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java [427:700]
private TableMgmtStats manageTablets(Iterator<TabletManagement> iter,
TabletManagementParameters tableMgmtParams,
SortedMap<TServerInstance,TabletServerStatus> currentTServers, boolean isFullScan)
throws TException, DistributedStoreException, WalMarkerException, IOException {
final TableMgmtStats tableMgmtStats = new TableMgmtStats();
final boolean shuttingDownAllTabletServers =
tableMgmtParams.getServersToShutdown().equals(currentTServers.keySet());
if (shuttingDownAllTabletServers && !isFullScan) {
// If we are shutting down all of the TabletServers, then don't process any events
// from the EventCoordinator.
LOG.debug("Partial scan requested, but aborted due to shutdown of all TabletServers");
return tableMgmtStats;
}
int unloaded = 0;
TabletLists tLists = new TabletLists(currentTServers, tableMgmtParams.getGroupedTServers(),
tableMgmtParams.getServersToShutdown());
CompactionJobGenerator compactionGenerator =
new CompactionJobGenerator(new ServiceEnvironmentImpl(manager.getContext()),
tableMgmtParams.getCompactionHints(), tableMgmtParams.getSteadyTime());
try {
CheckCompactionConfig.validate(manager.getConfiguration(), Level.TRACE);
this.metrics.clearCompactionServiceConfigurationError();
} catch (RuntimeException | ReflectiveOperationException e) {
this.metrics.setCompactionServiceConfigurationError();
LOG.error(
"Error validating compaction configuration, all {} compactions are paused until the configuration is fixed.",
store.getLevel(), e);
compactionGenerator = null;
}
Set<TServerInstance> filteredServersToShutdown =
new HashSet<>(tableMgmtParams.getServersToShutdown());
while (iter.hasNext()) {
final TabletManagement mti = iter.next();
if (mti == null) {
throw new IllegalStateException("State store returned a null ManagerTabletInfo object");
}
final TabletMetadata tm = mti.getTabletMetadata();
final String mtiError = mti.getErrorMessage();
if (mtiError != null) {
// An error happened on the TabletServer in the TabletManagementIterator
// when trying to process this extent.
LOG.warn(
"Error on TabletServer trying to get Tablet management information for extent: {}. Error message: {}",
tm.getExtent(), mtiError);
this.metrics.incrementTabletGroupWatcherError(this.store.getLevel());
tableMgmtStats.tabletsWithErrors++;
continue;
}
final TableId tableId = tm.getTableId();
// ignore entries for tables that do not exist in zookeeper
if (manager.getTableManager().getTableState(tableId) == null) {
continue;
}
// Don't overwhelm the tablet servers with work
if (tLists.unassigned.size() + unloaded
> Manager.MAX_TSERVER_WORK_CHUNK * currentTServers.size()
|| tLists.volumeReplacements.size() > 1000) {
flushChanges(tLists);
tLists.reset();
unloaded = 0;
}
final TableConfiguration tableConf = manager.getContext().getTableConfiguration(tableId);
TabletState state = TabletState.compute(tm, currentTServers.keySet());
if (state == TabletState.ASSIGNED_TO_DEAD_SERVER) {
/*
* This code exists to deal with a race condition caused by two threads running in this
* class that compute tablets actions. One thread does full scans and the other reacts to
* events and does partial scans. Below is an example of the race condition this is
* handling.
*
* - TGW Thread 1 : reads the set of tablets servers and its empty
*
* - TGW Thread 2 : reads the set of tablet servers and its [TS1]
*
* - TGW Thread 2 : Sees tabletX without a location and assigns it to TS1
*
* - TGW Thread 1 : Sees tabletX assigned to TS1 and assumes it's assigned to a dead tablet
* server because its set of live servers is the empty set.
*
* To deal with this race condition, this code recomputes the tablet state using the latest
* tservers when a tablet is seen assigned to a dead tserver.
*/
TabletState newState = TabletState.compute(tm, manager.tserversSnapshot().getTservers());
if (newState != state) {
LOG.debug("Tablet state changed when using latest set of tservers {} {} {}",
tm.getExtent(), state, newState);
state = newState;
}
}
tableMgmtStats.counts[state.ordinal()]++;
// This is final because nothing in this method should change the goal. All computation of the
// goal should be done in TabletGoalState.compute() so that all parts of the Accumulo code
// will compute a consistent goal.
final TabletGoalState goal =
TabletGoalState.compute(tm, state, manager.tabletBalancer, tableMgmtParams);
final Set<ManagementAction> actions = mti.getActions();
if (actions.contains(ManagementAction.NEEDS_RECOVERY) && goal != TabletGoalState.HOSTED) {
LOG.warn("Tablet has wals, but goal is not hosted. Tablet: {}, goal:{}", tm.getExtent(),
goal);
}
if (actions.contains(ManagementAction.NEEDS_VOLUME_REPLACEMENT)) {
tableMgmtStats.totalVolumeReplacements++;
if (state == TabletState.UNASSIGNED || state == TabletState.SUSPENDED) {
var volRep =
VolumeUtil.computeVolumeReplacements(tableMgmtParams.getVolumeReplacements(), tm);
if (volRep.logsToRemove.size() + volRep.filesToRemove.size() > 0) {
if (tm.getLocation() != null) {
// since the totalVolumeReplacements counter was incremented, should try this again
// later after its unassigned
LOG.debug("Volume replacement needed for {} but it has a location {}.",
tm.getExtent(), tm.getLocation());
} else if (tm.getOperationId() != null) {
LOG.debug("Volume replacement needed for {} but it has an active operation {}.",
tm.getExtent(), tm.getOperationId());
} else {
LOG.debug("Volume replacement needed for {}.", tm.getExtent());
// buffer replacements so that multiple mutations can be done at once
tLists.volumeReplacements.add(volRep);
}
} else {
LOG.debug("Volume replacement evaluation for {} returned no changes.", tm.getExtent());
}
} else {
LOG.debug("Volume replacement needed for {} but its tablet state is {}.", tm.getExtent(),
state);
}
}
if (actions.contains(ManagementAction.BAD_STATE) && tm.isFutureAndCurrentLocationSet()) {
Manager.log.error("{}, saw tablet with multiple locations, which should not happen",
tm.getExtent());
logIncorrectTabletLocations(tm);
// take no further action for this tablet
continue;
}
final Location location = tm.getLocation();
Location current = null;
Location future = null;
if (tm.hasCurrent()) {
current = tm.getLocation();
} else {
future = tm.getLocation();
}
TabletLogger.missassigned(tm.getExtent(), goal.toString(), state.toString(),
future != null ? future.getServerInstance() : null,
current != null ? current.getServerInstance() : null, tm.getLogs().size());
if (isFullScan) {
stats.update(tableId, state);
}
if (Manager.log.isTraceEnabled()) {
Manager.log.trace(
"[{}] Shutting down all Tservers: {}, dependentCount: {} Extent: {}, state: {}, goal: {} actions:{} #wals:{}",
store.name(), tableMgmtParams.getServersToShutdown().equals(currentTServers.keySet()),
dependentWatcher == null ? "null" : dependentWatcher.assignedOrHosted(), tm.getExtent(),
state, goal, actions, tm.getLogs().size());
}
final boolean needsSplit = actions.contains(ManagementAction.NEEDS_SPLITTING);
if (needsSplit) {
LOG.debug("{} may need splitting.", tm.getExtent());
manager.getSplitter().initiateSplit(tm.getExtent());
}
if (actions.contains(ManagementAction.NEEDS_COMPACTING) && compactionGenerator != null) {
// Check if tablet needs splitting, priority should be giving to splits over
// compactions because it's best to compact after a split
if (!needsSplit) {
var jobs = compactionGenerator.generateJobs(tm,
TabletManagementIterator.determineCompactionKinds(actions));
LOG.debug("{} may need compacting adding {} jobs", tm.getExtent(), jobs.size());
manager.getCompactionCoordinator().addJobs(tm, jobs);
} else {
LOG.trace("skipping compaction job generation because {} may need splitting.",
tm.getExtent());
}
}
if (actions.contains(ManagementAction.NEEDS_LOCATION_UPDATE)
|| actions.contains(ManagementAction.NEEDS_RECOVERY)) {
if (tm.getLocation() != null) {
filteredServersToShutdown.remove(tm.getLocation().getServerInstance());
}
if (goal == TabletGoalState.HOSTED) {
// RecoveryManager.recoverLogs will return false when all of the logs
// have been sorted so that recovery can occur. Delay the hosting of
// the Tablet until the sorting is finished.
if ((state != TabletState.HOSTED && actions.contains(ManagementAction.NEEDS_RECOVERY))
&& manager.recoveryManager.recoverLogs(tm.getExtent(), tm.getLogs())) {
LOG.debug("Not hosting {} as it needs recovery, logs: {}", tm.getExtent(),
tm.getLogs().size());
continue;
}
switch (state) {
case ASSIGNED_TO_DEAD_SERVER:
hostDeadTablet(tLists, tm, location);
break;
case SUSPENDED:
hostSuspendedTablet(tLists, tm, location, tableConf);
break;
case UNASSIGNED:
hostUnassignedTablet(tLists, tm.getExtent(),
new UnassignedTablet(location, tm.getLast()));
break;
case ASSIGNED:
// Send another reminder
tLists.assigned.add(new Assignment(tm.getExtent(),
future != null ? future.getServerInstance() : null, tm.getLast()));
break;
case HOSTED:
break;
}
} else {
switch (state) {
case SUSPENDED:
// Request a move to UNASSIGNED, so as to allow balancing to continue.
tLists.suspendedToGoneServers.add(tm);
break;
case ASSIGNED_TO_DEAD_SERVER:
unassignDeadTablet(tLists, tm);
break;
case HOSTED:
TServerConnection client =
manager.tserverSet.getConnection(location.getServerInstance());
if (client != null) {
TABLET_UNLOAD_LOGGER.trace("[{}] Requesting TabletServer {} unload {} {}",
store.name(), location.getServerInstance(), tm.getExtent(), goal.howUnload());
client.unloadTablet(manager.managerLock, tm.getExtent(), goal.howUnload(),
manager.getSteadyTime().getMillis());
tableMgmtStats.totalUnloaded++;
unloaded++;
} else {
Manager.log.warn("Could not connect to server {}", location);
}
break;
case ASSIGNED:
case UNASSIGNED:
break;
}
}
}
}
flushChanges(tLists);
if (isFullScan) {
this.filteredServersToShutdown = Set.copyOf(filteredServersToShutdown);
}
return tableMgmtStats;
}