in core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java [385:554]
public long balance(BalanceParameters params) {
long minBalanceTime = 20_000;
// Iterate over the tables and balance each of them
Map<String,TableId> tableIdMap = params.getTablesToBalance();
Map<TableId,String> tableIdToTableName = tableIdMap.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
tableIdToTableName.keySet().forEach(this::checkTableConfig);
long now = System.currentTimeMillis();
HrtlbConf myConf = hrtlbConf.get();
SortedMap<TabletServerId,TServerStatus> current = params.currentStatus();
Set<TabletId> migrations = params.currentMigrations();
List<TabletMigration> migrationsOut = params.migrationsOut();
Map<String,SortedMap<TabletServerId,TServerStatus>> currentGrouped =
splitCurrentByRegex(params.currentStatus());
final DataLevel currentLevel = DataLevel.valueOf(params.currentLevel());
if ((now
- this.lastOOBCheckTimes.computeIfAbsent(currentLevel, (l) -> System.currentTimeMillis()))
> myConf.oobCheckMillis) {
try {
// Check to see if a tablet is assigned outside the bounds of the pool. If so, migrate it.
for (String table : tableIdMap.keySet()) {
LOG.debug("Checking for out of bounds tablets for table {}", table);
String tablePoolName = getPoolNameForTable(table);
for (Entry<TabletServerId,TServerStatus> e : current.entrySet()) {
// pool names are the same as table names, except in the DEFAULT case.
// If this table is assigned to a pool for this host, then move on.
List<String> hostPools = getPoolNamesForHost(e.getKey());
if (hostPools.contains(tablePoolName)) {
continue;
}
TableId tid = tableIdMap.get(table);
if (tid == null) {
LOG.warn("Unable to check for out of bounds tablets for table {},"
+ " it may have been deleted or renamed.", table);
continue;
}
try {
List<TabletStatistics> outOfBoundsTablets = getOnlineTabletsForTable(e.getKey(), tid);
if (outOfBoundsTablets == null) {
continue;
}
for (TabletStatistics ts : outOfBoundsTablets) {
if (migrations.contains(ts.getTabletId())) {
LOG.debug("Migration for out of bounds tablet {} has already been requested",
ts.getTabletId());
continue;
}
String poolName = getPoolNameForTable(table);
SortedMap<TabletServerId,TServerStatus> currentView = currentGrouped.get(poolName);
if (currentView != null) {
int skip = RANDOM.get().nextInt(currentView.size());
Iterator<TabletServerId> iter = currentView.keySet().iterator();
for (int i = 0; i < skip; i++) {
iter.next();
}
TabletServerId nextTS = iter.next();
LOG.info(
"Tablet {} is currently outside the bounds of the"
+ " regex, migrating from {} to {}",
ts.getTabletId(), e.getKey(), nextTS);
migrationsOut.add(new TabletMigration(ts.getTabletId(), e.getKey(), nextTS));
if (migrationsOut.size() >= myConf.maxTServerMigrations) {
break;
}
} else {
LOG.warn("No tablet servers online for pool {}, unable to"
+ " migrate out of bounds tablets", poolName);
}
}
} catch (AccumuloException | AccumuloSecurityException e1) {
LOG.error("Error in OOB check getting tablets for table {} from server {} {}", tid,
e.getKey().getHost(), e);
}
}
}
} finally {
// this could have taken a while...get a new time
this.lastOOBCheckTimes.put(currentLevel, System.currentTimeMillis());
}
}
if (!migrationsOut.isEmpty()) {
LOG.warn("Not balancing tables due to moving {} out of bounds tablets", migrationsOut.size());
LOG.info("Migrating out of bounds tablets: {}", migrationsOut);
return minBalanceTime;
}
if (migrations != null && !migrations.isEmpty()) {
if (migrations.size() >= myConf.maxOutstandingMigrations) {
LOG.warn("Not balancing tables due to {} outstanding migrations", migrations.size());
if (LOG.isTraceEnabled()) {
LOG.trace("Sample up to 10 outstanding migrations: {}", limitTen(migrations));
}
return minBalanceTime;
}
LOG.debug("Current outstanding migrations of {} being applied", migrations.size());
if (LOG.isTraceEnabled()) {
LOG.trace("Sample up to 10 outstanding migrations: {}", limitTen(migrations));
}
migrationsFromLastPass.keySet().retainAll(migrations);
SortedMap<TabletServerId,TServerStatusImpl> currentCopy = new TreeMap<>();
current.forEach((tid, status) -> currentCopy.put(tid, (TServerStatusImpl) status));
Multimap<TabletServerId,String> serverTableIdCopied = HashMultimap.create();
for (TabletMigration migration : migrationsFromLastPass.values()) {
TableStatisticsImpl fromInfo = getTableInfo(currentCopy, serverTableIdCopied,
migration.getTablet().getTable().canonical(), migration.getOldTabletServer());
if (fromInfo != null) {
fromInfo.setOnlineTabletCount(fromInfo.getOnlineTabletCount() - 1);
}
TableStatisticsImpl toInfo = getTableInfo(currentCopy, serverTableIdCopied,
migration.getTablet().getTable().canonical(), migration.getNewTabletServer());
if (toInfo != null) {
toInfo.setOnlineTabletCount(toInfo.getOnlineTabletCount() + 1);
}
}
migrations = EMPTY_MIGRATIONS;
} else {
migrationsFromLastPass.clear();
}
Timer balanceTimer = Timer.startNew();
for (TableId tableId : tableIdMap.values()) {
String tableName = tableIdToTableName.get(tableId);
String regexTableName = getPoolNameForTable(tableName);
SortedMap<TabletServerId,TServerStatus> currentView = currentGrouped.get(regexTableName);
if (currentView == null) {
LOG.warn("Skipping balance for table {} as no tablet servers are online.", tableName);
continue;
}
ArrayList<TabletMigration> newMigrations = new ArrayList<>();
balanceTimer.restart();
getBalancerForTable(tableId)
.balance(new BalanceParamsImpl(currentView, params.currentResourceGroups(), migrations,
newMigrations, DataLevel.of(tableId), Map.of(tableName, tableId)));
LOG.trace("balance results tableId:{} migrations:{} time:{}ms", tableId, newMigrations.size(),
balanceTimer.elapsed(TimeUnit.MILLISECONDS));
if (newMigrations.isEmpty()) {
tableToTimeSinceNoMigrations.remove(tableId);
} else if (tableToTimeSinceNoMigrations.containsKey(tableId)) {
if ((now - tableToTimeSinceNoMigrations.get(tableId)) > HOURS.toMillis(1)) {
LOG.warn("We have been consistently producing migrations for {}: {}", tableName,
limitTen(newMigrations));
}
} else {
tableToTimeSinceNoMigrations.put(tableId, now);
}
migrationsOut.addAll(newMigrations);
if (migrationsOut.size() >= myConf.maxTServerMigrations) {
MIGRATIONS_LOGGER.debug("Table {} migration size : {} is over tserver migration max: {}",
tableName, migrationsOut.size(), myConf.maxTServerMigrations);
break;
}
}
for (TabletMigration migration : migrationsOut) {
migrationsFromLastPass.put(migration.getTablet(), migration);
}
LOG.info("Migrating {} tablets for balance.", migrationsOut.size());
LOG.debug("Tablets currently migrating: {}", migrationsOut);
return minBalanceTime;
}