in hbase-tools/src/main/java/org/apache/hbase/RegionsMerger.java [177:254]
public void mergeRegions(String tblName, int targetRegions) throws Exception {
TableName table = TableName.valueOf(tblName);
Path tableDir = getTablePath(table);
try(Connection conn = ConnectionFactory.createConnection(conf)) {
Admin admin = conn.getAdmin();
LongAdder counter = new LongAdder();
LongAdder lastTimeProgessed = new LongAdder();
//need to get all regions for the table, regardless of region state
List<RegionInfo> regions = admin.getRegions(table);
Map<Future, Pair<RegionInfo, RegionInfo>> regionsMerging = new ConcurrentHashMap<>();
long roundsNoProgress = 0;
while (regions.size() > targetRegions) {
LOG.info("Iteration: {}", counter);
RegionInfo previous = null;
int regionSize = regions.size();
LOG.info("Attempting to merge {} regions to reach the target {} ...", regionSize,
targetRegions);
//to request merge, regions must be OPEN, though
regions = getOpenRegions(conn, table);
for (RegionInfo current : regions) {
if (!current.isSplit()) {
if (previous != null &&
canMerge(tableDir, previous, current,regionsMerging.values())) {
//Before submitting a merge request, we need to check if any of the region candidates
//still have merge references from previous cycle
boolean hasMergeRef =
hasPreviousMergeRef(conn, previous) || hasPreviousMergeRef(conn, current);
if(!hasMergeRef) {
Future f = admin.mergeRegionsAsync(current.getEncodedNameAsBytes(),
previous.getEncodedNameAsBytes(), true);
Pair<RegionInfo, RegionInfo> regionPair = new Pair<>(previous, current);
regionsMerging.put(f, regionPair);
if ((regionSize - regionsMerging.size()) <= targetRegions) {
break;
}
} else {
LOG.info("Skipping merge of candidates {} and {} because of existing merge "
+ "qualifiers.", previous.getEncodedName(), current.getEncodedName());
}
previous = null;
} else {
previous = current;
}
}
else{
LOG.debug("Skipping split region: {}", current.getEncodedName());
}
}
counter.increment();
LOG.info("Sleeping for {} seconds before next iteration...", (sleepBetweenCycles/1000));
Thread.sleep(sleepBetweenCycles);
regionsMerging.forEach((f, currentPair)-> {
if (f.isDone()) {
LOG.info("Merged regions {} and {} together.",
currentPair.getFirst().getEncodedName(),
currentPair.getSecond().getEncodedName());
regionsMerging.remove(f);
lastTimeProgessed.reset();
lastTimeProgessed.add(counter.longValue());
} else {
LOG.warn("Merge of regions {} and {} isn't completed yet.",
currentPair.getFirst(),
currentPair.getSecond());
}
});
roundsNoProgress = counter.longValue() - lastTimeProgessed.longValue();
if(roundsNoProgress == this.maxRoundsStuck){
LOG.warn("Reached {} iterations without progressing with new merges. Aborting...",
roundsNoProgress);
break;
}
//again, get all regions, regardless of the state,
// in order to avoid breaking the loop prematurely
regions = admin.getRegions(table);
}
}
}