protected List balanceTable()

in hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java [256:463]


  protected List<RegionPlan> balanceTable(TableName tableName,
    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
    long startTime = EnvironmentEdgeManager.currentTime();

    // construct a Cluster object with clusterMap and rest of the
    // argument as defaults
    BalancerClusterState c =
      new BalancerClusterState(loadOfOneTable, null, this.regionFinder, this.rackManager);
    if (!needsBalance(c) && !this.overallNeedsBalance()) {
      return null;
    }
    ClusterLoadState cs = new ClusterLoadState(loadOfOneTable);
    int numServers = cs.getNumServers();
    NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad();
    int numRegions = cs.getNumRegions();
    float average = cs.getLoadAverage();
    int max = (int) Math.ceil(average);
    int min = (int) average;

    // Using to check balance result.
    StringBuilder strBalanceParam = new StringBuilder();
    strBalanceParam.append("Balance parameter: numRegions=").append(numRegions)
      .append(", numServers=").append(numServers).append(", max=").append(max).append(", min=")
      .append(min);
    LOG.debug(strBalanceParam.toString());

    // Balance the cluster
    // TODO: Look at data block locality or a more complex load to do this
    MinMaxPriorityQueue<RegionPlan> regionsToMove =
      MinMaxPriorityQueue.orderedBy(rpComparator).create();
    List<RegionPlan> regionsToReturn = new ArrayList<>();

    // Walk down most loaded, pruning each to the max
    int serversOverloaded = 0;
    // flag used to fetch regions from head and tail of list, alternately
    boolean fetchFromTail = false;
    Map<ServerName, BalanceInfo> serverBalanceInfo = new TreeMap<>();
    for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.descendingMap()
      .entrySet()) {
      ServerAndLoad sal = server.getKey();
      int load = sal.getLoad();
      if (load <= max) {
        serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0, server.getValue()));
        continue;
      }
      serversOverloaded++;
      List<RegionInfo> regions = server.getValue();
      int numToOffload = Math.min(load - max, regions.size());
      // account for the out-of-band regions which were assigned to this server
      // after some other region server crashed
      Collections.sort(regions, riComparator);
      int numTaken = 0;
      for (int i = 0; i <= numToOffload;) {
        RegionInfo hri = regions.get(i); // fetch from head
        if (fetchFromTail) {
          hri = regions.get(regions.size() - 1 - i);
        }
        i++;
        regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null));
        numTaken++;
        if (numTaken >= numToOffload) {
          break;
        }
      }
      serverBalanceInfo.put(sal.getServerName(),
        new BalanceInfo(numToOffload, -numTaken, server.getValue()));
    }
    int totalNumMoved = regionsToMove.size();

    // Walk down least loaded, filling each to the min
    int neededRegions = 0; // number of regions needed to bring all up to min
    fetchFromTail = false;

    Map<ServerName, Integer> underloadedServers = new HashMap<>();
    int maxToTake = numRegions - min;
    for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.entrySet()) {
      if (maxToTake == 0) {
        break; // no more to take
      }
      int load = server.getKey().getLoad();
      if (load >= min) {
        continue; // look for other servers which haven't reached min
      }
      int regionsToPut = min - load;
      maxToTake -= regionsToPut;
      underloadedServers.put(server.getKey().getServerName(), regionsToPut);
    }
    // number of servers that get new regions
    int serversUnderloaded = underloadedServers.size();
    int incr = 1;
    List<ServerName> sns =
      Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded]));
    Collections.shuffle(sns);
    while (regionsToMove.size() > 0) {
      int cnt = 0;
      int i = incr > 0 ? 0 : underloadedServers.size() - 1;
      for (; i >= 0 && i < underloadedServers.size(); i += incr) {
        if (regionsToMove.isEmpty()) {
          break;
        }
        ServerName si = sns.get(i);
        int numToTake = underloadedServers.get(si);
        if (numToTake == 0) {
          continue;
        }

        addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn);

        underloadedServers.put(si, numToTake - 1);
        cnt++;
        BalanceInfo bi = serverBalanceInfo.get(si);
        bi.setNumRegionsAdded(bi.getNumRegionsAdded() + 1);
      }
      if (cnt == 0) {
        break;
      }
      // iterates underloadedServers in the other direction
      incr = -incr;
    }
    for (Integer i : underloadedServers.values()) {
      // If we still want to take some, increment needed
      neededRegions += i;
    }

    // Need to do a second pass.
    // Either more regions to assign out or servers that are still underloaded

    // If we need more to fill min, grab one from each most loaded until enough
    if (neededRegions != 0) {
      // Walk down most loaded, grabbing one from each until we get enough
      for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.descendingMap()
        .entrySet()) {
        BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
        int idx = balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
        if (idx >= server.getValue().size()) {
          break;
        }
        RegionInfo region = server.getValue().get(idx);
        if (region.isMetaRegion()) {
          continue; // Don't move meta regions.
        }
        regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
        balanceInfo.setNumRegionsAdded(balanceInfo.getNumRegionsAdded() - 1);
        balanceInfo.setNextRegionForUnload(balanceInfo.getNextRegionForUnload() + 1);
        totalNumMoved++;
        if (--neededRegions == 0) {
          // No more regions needed, done shedding
          break;
        }
      }
    }

    // Now we have a set of regions that must be all assigned out
    // Assign each underloaded up to the min, then if leftovers, assign to max

    // Walk down least loaded, assigning to each to fill up to min
    for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.entrySet()) {
      int regionCount = server.getKey().getLoad();
      if (regionCount >= min) {
        break;
      }
      BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
      if (balanceInfo != null) {
        regionCount += balanceInfo.getNumRegionsAdded();
      }
      if (regionCount >= min) {
        continue;
      }
      int numToTake = min - regionCount;
      int numTaken = 0;
      while (numTaken < numToTake && 0 < regionsToMove.size()) {
        addRegionPlan(regionsToMove, fetchFromTail, server.getKey().getServerName(),
          regionsToReturn);
        numTaken++;
        balanceInfo.setNumRegionsAdded(balanceInfo.getNumRegionsAdded() + 1);
      }
    }

    if (min != max) {
      balanceOverall(regionsToReturn, serverBalanceInfo, fetchFromTail, regionsToMove, max, min);
    }

    long endTime = EnvironmentEdgeManager.currentTime();

    if (!regionsToMove.isEmpty() || neededRegions != 0) {
      // Emit data so can diagnose how balancer went astray.
      LOG.warn(
        "regionsToMove=" + totalNumMoved + ", numServers=" + numServers + ", serversOverloaded="
          + serversOverloaded + ", serversUnderloaded=" + serversUnderloaded);
      StringBuilder sb = new StringBuilder();
      for (Map.Entry<ServerName, List<RegionInfo>> e : loadOfOneTable.entrySet()) {
        if (sb.length() > 0) {
          sb.append(", ");
        }
        sb.append(e.getKey().toString());
        sb.append(" ");
        sb.append(e.getValue().size());
      }
      LOG.warn("Input " + sb.toString());
    }

    // All done!
    LOG.info("Done. Calculated a load balance in " + (endTime - startTime) + "ms. " + "Moving "
      + totalNumMoved + " regions off of " + serversOverloaded + " overloaded servers onto "
      + serversUnderloaded + " less loaded servers");

    return regionsToReturn;
  }