in hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java [256:463]
protected List<RegionPlan> balanceTable(TableName tableName,
Map<ServerName, List<RegionInfo>> loadOfOneTable) {
long startTime = EnvironmentEdgeManager.currentTime();
// construct a Cluster object with clusterMap and rest of the
// argument as defaults
BalancerClusterState c =
new BalancerClusterState(loadOfOneTable, null, this.regionFinder, this.rackManager);
if (!needsBalance(c) && !this.overallNeedsBalance()) {
return null;
}
ClusterLoadState cs = new ClusterLoadState(loadOfOneTable);
int numServers = cs.getNumServers();
NavigableMap<ServerAndLoad, List<RegionInfo>> serversByLoad = cs.getServersByLoad();
int numRegions = cs.getNumRegions();
float average = cs.getLoadAverage();
int max = (int) Math.ceil(average);
int min = (int) average;
// Using to check balance result.
StringBuilder strBalanceParam = new StringBuilder();
strBalanceParam.append("Balance parameter: numRegions=").append(numRegions)
.append(", numServers=").append(numServers).append(", max=").append(max).append(", min=")
.append(min);
LOG.debug(strBalanceParam.toString());
// Balance the cluster
// TODO: Look at data block locality or a more complex load to do this
MinMaxPriorityQueue<RegionPlan> regionsToMove =
MinMaxPriorityQueue.orderedBy(rpComparator).create();
List<RegionPlan> regionsToReturn = new ArrayList<>();
// Walk down most loaded, pruning each to the max
int serversOverloaded = 0;
// flag used to fetch regions from head and tail of list, alternately
boolean fetchFromTail = false;
Map<ServerName, BalanceInfo> serverBalanceInfo = new TreeMap<>();
for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.descendingMap()
.entrySet()) {
ServerAndLoad sal = server.getKey();
int load = sal.getLoad();
if (load <= max) {
serverBalanceInfo.put(sal.getServerName(), new BalanceInfo(0, 0, server.getValue()));
continue;
}
serversOverloaded++;
List<RegionInfo> regions = server.getValue();
int numToOffload = Math.min(load - max, regions.size());
// account for the out-of-band regions which were assigned to this server
// after some other region server crashed
Collections.sort(regions, riComparator);
int numTaken = 0;
for (int i = 0; i <= numToOffload;) {
RegionInfo hri = regions.get(i); // fetch from head
if (fetchFromTail) {
hri = regions.get(regions.size() - 1 - i);
}
i++;
regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null));
numTaken++;
if (numTaken >= numToOffload) {
break;
}
}
serverBalanceInfo.put(sal.getServerName(),
new BalanceInfo(numToOffload, -numTaken, server.getValue()));
}
int totalNumMoved = regionsToMove.size();
// Walk down least loaded, filling each to the min
int neededRegions = 0; // number of regions needed to bring all up to min
fetchFromTail = false;
Map<ServerName, Integer> underloadedServers = new HashMap<>();
int maxToTake = numRegions - min;
for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.entrySet()) {
if (maxToTake == 0) {
break; // no more to take
}
int load = server.getKey().getLoad();
if (load >= min) {
continue; // look for other servers which haven't reached min
}
int regionsToPut = min - load;
maxToTake -= regionsToPut;
underloadedServers.put(server.getKey().getServerName(), regionsToPut);
}
// number of servers that get new regions
int serversUnderloaded = underloadedServers.size();
int incr = 1;
List<ServerName> sns =
Arrays.asList(underloadedServers.keySet().toArray(new ServerName[serversUnderloaded]));
Collections.shuffle(sns);
while (regionsToMove.size() > 0) {
int cnt = 0;
int i = incr > 0 ? 0 : underloadedServers.size() - 1;
for (; i >= 0 && i < underloadedServers.size(); i += incr) {
if (regionsToMove.isEmpty()) {
break;
}
ServerName si = sns.get(i);
int numToTake = underloadedServers.get(si);
if (numToTake == 0) {
continue;
}
addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn);
underloadedServers.put(si, numToTake - 1);
cnt++;
BalanceInfo bi = serverBalanceInfo.get(si);
bi.setNumRegionsAdded(bi.getNumRegionsAdded() + 1);
}
if (cnt == 0) {
break;
}
// iterates underloadedServers in the other direction
incr = -incr;
}
for (Integer i : underloadedServers.values()) {
// If we still want to take some, increment needed
neededRegions += i;
}
// Need to do a second pass.
// Either more regions to assign out or servers that are still underloaded
// If we need more to fill min, grab one from each most loaded until enough
if (neededRegions != 0) {
// Walk down most loaded, grabbing one from each until we get enough
for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.descendingMap()
.entrySet()) {
BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
int idx = balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
if (idx >= server.getValue().size()) {
break;
}
RegionInfo region = server.getValue().get(idx);
if (region.isMetaRegion()) {
continue; // Don't move meta regions.
}
regionsToMove.add(new RegionPlan(region, server.getKey().getServerName(), null));
balanceInfo.setNumRegionsAdded(balanceInfo.getNumRegionsAdded() - 1);
balanceInfo.setNextRegionForUnload(balanceInfo.getNextRegionForUnload() + 1);
totalNumMoved++;
if (--neededRegions == 0) {
// No more regions needed, done shedding
break;
}
}
}
// Now we have a set of regions that must be all assigned out
// Assign each underloaded up to the min, then if leftovers, assign to max
// Walk down least loaded, assigning to each to fill up to min
for (Map.Entry<ServerAndLoad, List<RegionInfo>> server : serversByLoad.entrySet()) {
int regionCount = server.getKey().getLoad();
if (regionCount >= min) {
break;
}
BalanceInfo balanceInfo = serverBalanceInfo.get(server.getKey().getServerName());
if (balanceInfo != null) {
regionCount += balanceInfo.getNumRegionsAdded();
}
if (regionCount >= min) {
continue;
}
int numToTake = min - regionCount;
int numTaken = 0;
while (numTaken < numToTake && 0 < regionsToMove.size()) {
addRegionPlan(regionsToMove, fetchFromTail, server.getKey().getServerName(),
regionsToReturn);
numTaken++;
balanceInfo.setNumRegionsAdded(balanceInfo.getNumRegionsAdded() + 1);
}
}
if (min != max) {
balanceOverall(regionsToReturn, serverBalanceInfo, fetchFromTail, regionsToMove, max, min);
}
long endTime = EnvironmentEdgeManager.currentTime();
if (!regionsToMove.isEmpty() || neededRegions != 0) {
// Emit data so can diagnose how balancer went astray.
LOG.warn(
"regionsToMove=" + totalNumMoved + ", numServers=" + numServers + ", serversOverloaded="
+ serversOverloaded + ", serversUnderloaded=" + serversUnderloaded);
StringBuilder sb = new StringBuilder();
for (Map.Entry<ServerName, List<RegionInfo>> e : loadOfOneTable.entrySet()) {
if (sb.length() > 0) {
sb.append(", ");
}
sb.append(e.getKey().toString());
sb.append(" ");
sb.append(e.getValue().size());
}
LOG.warn("Input " + sb.toString());
}
// All done!
LOG.info("Done. Calculated a load balance in " + (endTime - startTime) + "ms. " + "Moving "
+ totalNumMoved + " regions off of " + serversOverloaded + " overloaded servers onto "
+ serversUnderloaded + " less loaded servers");
return regionsToReturn;
}