public long balance()

in core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java [385:554]


  public long balance(BalanceParameters params) {
    long minBalanceTime = 20_000;
    // Iterate over the tables and balance each of them
    Map<String,TableId> tableIdMap = params.getTablesToBalance();
    Map<TableId,String> tableIdToTableName = tableIdMap.entrySet().stream()
        .collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
    tableIdToTableName.keySet().forEach(this::checkTableConfig);

    long now = System.currentTimeMillis();

    HrtlbConf myConf = hrtlbConf.get();

    SortedMap<TabletServerId,TServerStatus> current = params.currentStatus();
    Set<TabletId> migrations = params.currentMigrations();
    List<TabletMigration> migrationsOut = params.migrationsOut();

    Map<String,SortedMap<TabletServerId,TServerStatus>> currentGrouped =
        splitCurrentByRegex(params.currentStatus());
    final DataLevel currentLevel = DataLevel.valueOf(params.currentLevel());

    if ((now
        - this.lastOOBCheckTimes.computeIfAbsent(currentLevel, (l) -> System.currentTimeMillis()))
        > myConf.oobCheckMillis) {
      try {
        // Check to see if a tablet is assigned outside the bounds of the pool. If so, migrate it.
        for (String table : tableIdMap.keySet()) {
          LOG.debug("Checking for out of bounds tablets for table {}", table);
          String tablePoolName = getPoolNameForTable(table);
          for (Entry<TabletServerId,TServerStatus> e : current.entrySet()) {
            // pool names are the same as table names, except in the DEFAULT case.
            // If this table is assigned to a pool for this host, then move on.
            List<String> hostPools = getPoolNamesForHost(e.getKey());
            if (hostPools.contains(tablePoolName)) {
              continue;
            }
            TableId tid = tableIdMap.get(table);
            if (tid == null) {
              LOG.warn("Unable to check for out of bounds tablets for table {},"
                  + " it may have been deleted or renamed.", table);
              continue;
            }
            try {
              List<TabletStatistics> outOfBoundsTablets = getOnlineTabletsForTable(e.getKey(), tid);
              if (outOfBoundsTablets == null) {
                continue;
              }
              for (TabletStatistics ts : outOfBoundsTablets) {
                if (migrations.contains(ts.getTabletId())) {
                  LOG.debug("Migration for out of bounds tablet {} has already been requested",
                      ts.getTabletId());
                  continue;
                }
                String poolName = getPoolNameForTable(table);
                SortedMap<TabletServerId,TServerStatus> currentView = currentGrouped.get(poolName);
                if (currentView != null) {
                  int skip = RANDOM.get().nextInt(currentView.size());
                  Iterator<TabletServerId> iter = currentView.keySet().iterator();
                  for (int i = 0; i < skip; i++) {
                    iter.next();
                  }
                  TabletServerId nextTS = iter.next();
                  LOG.info(
                      "Tablet {} is currently outside the bounds of the"
                          + " regex, migrating from {} to {}",
                      ts.getTabletId(), e.getKey(), nextTS);
                  migrationsOut.add(new TabletMigration(ts.getTabletId(), e.getKey(), nextTS));
                  if (migrationsOut.size() >= myConf.maxTServerMigrations) {
                    break;
                  }
                } else {
                  LOG.warn("No tablet servers online for pool {}, unable to"
                      + " migrate out of bounds tablets", poolName);
                }
              }
            } catch (AccumuloException | AccumuloSecurityException e1) {
              LOG.error("Error in OOB check getting tablets for table {} from server {} {}", tid,
                  e.getKey().getHost(), e);
            }
          }
        }
      } finally {
        // this could have taken a while...get a new time
        this.lastOOBCheckTimes.put(currentLevel, System.currentTimeMillis());
      }
    }

    if (!migrationsOut.isEmpty()) {
      LOG.warn("Not balancing tables due to moving {} out of bounds tablets", migrationsOut.size());
      LOG.info("Migrating out of bounds tablets: {}", migrationsOut);
      return minBalanceTime;
    }

    if (migrations != null && !migrations.isEmpty()) {
      if (migrations.size() >= myConf.maxOutstandingMigrations) {
        LOG.warn("Not balancing tables due to {} outstanding migrations", migrations.size());
        if (LOG.isTraceEnabled()) {
          LOG.trace("Sample up to 10 outstanding migrations: {}", limitTen(migrations));
        }
        return minBalanceTime;
      }

      LOG.debug("Current outstanding migrations of {} being applied", migrations.size());
      if (LOG.isTraceEnabled()) {
        LOG.trace("Sample up to 10 outstanding migrations: {}", limitTen(migrations));
      }
      migrationsFromLastPass.keySet().retainAll(migrations);
      SortedMap<TabletServerId,TServerStatusImpl> currentCopy = new TreeMap<>();
      current.forEach((tid, status) -> currentCopy.put(tid, (TServerStatusImpl) status));
      Multimap<TabletServerId,String> serverTableIdCopied = HashMultimap.create();
      for (TabletMigration migration : migrationsFromLastPass.values()) {
        TableStatisticsImpl fromInfo = getTableInfo(currentCopy, serverTableIdCopied,
            migration.getTablet().getTable().canonical(), migration.getOldTabletServer());
        if (fromInfo != null) {
          fromInfo.setOnlineTabletCount(fromInfo.getOnlineTabletCount() - 1);
        }
        TableStatisticsImpl toInfo = getTableInfo(currentCopy, serverTableIdCopied,
            migration.getTablet().getTable().canonical(), migration.getNewTabletServer());
        if (toInfo != null) {
          toInfo.setOnlineTabletCount(toInfo.getOnlineTabletCount() + 1);
        }
      }
      migrations = EMPTY_MIGRATIONS;
    } else {
      migrationsFromLastPass.clear();
    }

    Timer balanceTimer = Timer.startNew();
    for (TableId tableId : tableIdMap.values()) {
      String tableName = tableIdToTableName.get(tableId);
      String regexTableName = getPoolNameForTable(tableName);
      SortedMap<TabletServerId,TServerStatus> currentView = currentGrouped.get(regexTableName);
      if (currentView == null) {
        LOG.warn("Skipping balance for table {} as no tablet servers are online.", tableName);
        continue;
      }
      ArrayList<TabletMigration> newMigrations = new ArrayList<>();
      balanceTimer.restart();
      getBalancerForTable(tableId)
          .balance(new BalanceParamsImpl(currentView, params.currentResourceGroups(), migrations,
              newMigrations, DataLevel.of(tableId), Map.of(tableName, tableId)));
      LOG.trace("balance results tableId:{} migrations:{} time:{}ms", tableId, newMigrations.size(),
          balanceTimer.elapsed(TimeUnit.MILLISECONDS));

      if (newMigrations.isEmpty()) {
        tableToTimeSinceNoMigrations.remove(tableId);
      } else if (tableToTimeSinceNoMigrations.containsKey(tableId)) {
        if ((now - tableToTimeSinceNoMigrations.get(tableId)) > HOURS.toMillis(1)) {
          LOG.warn("We have been consistently producing migrations for {}: {}", tableName,
              limitTen(newMigrations));
        }
      } else {
        tableToTimeSinceNoMigrations.put(tableId, now);
      }

      migrationsOut.addAll(newMigrations);
      if (migrationsOut.size() >= myConf.maxTServerMigrations) {
        MIGRATIONS_LOGGER.debug("Table {} migration size : {} is over tserver migration max: {}",
            tableName, migrationsOut.size(), myConf.maxTServerMigrations);
        break;
      }
    }

    for (TabletMigration migration : migrationsOut) {
      migrationsFromLastPass.put(migration.getTablet(), migration);
    }

    LOG.info("Migrating {} tablets for balance.", migrationsOut.size());
    LOG.debug("Tablets currently migrating: {}", migrationsOut);
    return minBalanceTime;
  }