in hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java [400:594]
static void rollingSplit(TableName tableName, SplitAlgorithm splitAlgo, Configuration conf)
throws IOException, InterruptedException {
final int minOS = conf.getInt("split.outstanding", 2);
try (Connection connection = ConnectionFactory.createConnection(conf)) {
// Max outstanding splits. default == 50% of servers
final int MAX_OUTSTANDING = Math.max(getRegionServerCount(connection) / 2, minOS);
Path hbDir = CommonFSUtils.getRootDir(conf);
Path tableDir = CommonFSUtils.getTableDir(hbDir, tableName);
Path splitFile = new Path(tableDir, "_balancedSplit");
FileSystem fs = FileSystem.get(conf);
// Get a list of daughter regions to create
LinkedList<Pair<byte[], byte[]>> tmpRegionSet = null;
try (Table table = connection.getTable(tableName)) {
tmpRegionSet = getSplits(connection, tableName, splitAlgo);
}
LinkedList<Pair<byte[], byte[]>> outstanding = Lists.newLinkedList();
int splitCount = 0;
final int origCount = tmpRegionSet.size();
// all splits must compact & we have 1 compact thread, so 2 split
// requests to the same RS can stall the outstanding split queue.
// To fix, group the regions into an RS pool and round-robin through it
LOG.debug("Bucketing regions by regionserver...");
TreeMap<ServerName, LinkedList<Pair<byte[], byte[]>>> daughterRegions = Maps.newTreeMap();
// Get a regionLocator. Need it in below.
try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
for (Pair<byte[], byte[]> dr : tmpRegionSet) {
ServerName rsLocation = regionLocator.getRegionLocation(dr.getSecond()).getServerName();
if (!daughterRegions.containsKey(rsLocation)) {
LinkedList<Pair<byte[], byte[]>> entry = Lists.newLinkedList();
daughterRegions.put(rsLocation, entry);
}
daughterRegions.get(rsLocation).add(dr);
}
LOG.debug("Done with bucketing. Split time!");
long startTime = EnvironmentEdgeManager.currentTime();
// Open the split file and modify it as splits finish
byte[] rawData = readFile(fs, splitFile);
FSDataOutputStream splitOut = fs.create(splitFile);
try {
splitOut.write(rawData);
try {
// *** split code ***
while (!daughterRegions.isEmpty()) {
LOG.debug(daughterRegions.size() + " RS have regions to splt.");
// Get ServerName to region count mapping
final TreeMap<ServerName, Integer> rsSizes = Maps.newTreeMap();
List<HRegionLocation> hrls = regionLocator.getAllRegionLocations();
for (HRegionLocation hrl : hrls) {
ServerName sn = hrl.getServerName();
if (rsSizes.containsKey(sn)) {
rsSizes.put(sn, rsSizes.get(sn) + 1);
} else {
rsSizes.put(sn, 1);
}
}
// Round-robin through the ServerName list. Choose the lightest-loaded servers
// first to keep the master from load-balancing regions as we split.
for (Map.Entry<ServerName,
LinkedList<Pair<byte[], byte[]>>> daughterRegion : daughterRegions.entrySet()) {
Pair<byte[], byte[]> dr = null;
ServerName rsLoc = daughterRegion.getKey();
LinkedList<Pair<byte[], byte[]>> regionList = daughterRegion.getValue();
// Find a region in the ServerName list that hasn't been moved
LOG.debug("Finding a region on " + rsLoc);
while (!regionList.isEmpty()) {
dr = regionList.pop();
// get current region info
byte[] split = dr.getSecond();
HRegionLocation regionLoc = regionLocator.getRegionLocation(split);
// if this region moved locations
ServerName newRs = regionLoc.getServerName();
if (newRs.compareTo(rsLoc) != 0) {
LOG.debug("Region with " + splitAlgo.rowToStr(split) + " moved to " + newRs
+ ". Relocating...");
// relocate it, don't use it right now
if (!daughterRegions.containsKey(newRs)) {
LinkedList<Pair<byte[], byte[]>> entry = Lists.newLinkedList();
daughterRegions.put(newRs, entry);
}
daughterRegions.get(newRs).add(dr);
dr = null;
continue;
}
// make sure this region wasn't already split
byte[] sk = regionLoc.getRegion().getStartKey();
if (sk.length != 0) {
if (Bytes.equals(split, sk)) {
LOG.debug("Region already split on " + splitAlgo.rowToStr(split)
+ ". Skipping this region...");
++splitCount;
dr = null;
continue;
}
byte[] start = dr.getFirst();
Preconditions.checkArgument(Bytes.equals(start, sk),
splitAlgo.rowToStr(start) + " != " + splitAlgo.rowToStr(sk));
}
// passed all checks! found a good region
break;
}
if (regionList.isEmpty()) {
daughterRegions.remove(rsLoc);
}
if (dr == null) continue;
// we have a good region, time to split!
byte[] split = dr.getSecond();
LOG.debug("Splitting at " + splitAlgo.rowToStr(split));
try (Admin admin = connection.getAdmin()) {
admin.split(tableName, split);
}
LinkedList<Pair<byte[], byte[]>> finished = Lists.newLinkedList();
LinkedList<Pair<byte[], byte[]>> local_finished = Lists.newLinkedList();
if (conf.getBoolean("split.verify", true)) {
// we need to verify and rate-limit our splits
outstanding.addLast(dr);
// with too many outstanding splits, wait for some to finish
while (outstanding.size() >= MAX_OUTSTANDING) {
LOG.debug("Wait for outstanding splits " + outstanding.size());
local_finished = splitScan(outstanding, connection, tableName, splitAlgo);
if (local_finished.isEmpty()) {
Thread.sleep(30 * 1000);
} else {
finished.addAll(local_finished);
outstanding.removeAll(local_finished);
LOG.debug(local_finished.size() + " outstanding splits finished");
}
}
} else {
finished.add(dr);
}
// mark each finished region as successfully split.
for (Pair<byte[], byte[]> region : finished) {
splitOut.writeChars("- " + splitAlgo.rowToStr(region.getFirst()) + " "
+ splitAlgo.rowToStr(region.getSecond()) + "\n");
splitCount++;
if (splitCount % 10 == 0) {
long tDiff = (EnvironmentEdgeManager.currentTime() - startTime) / splitCount;
LOG.debug(
"STATUS UPDATE: " + splitCount + " / " + origCount + ". Avg Time / Split = "
+ org.apache.hadoop.util.StringUtils.formatTime(tDiff));
}
}
}
}
if (conf.getBoolean("split.verify", true)) {
while (!outstanding.isEmpty()) {
LOG.debug("Finally Wait for outstanding splits " + outstanding.size());
LinkedList<Pair<byte[], byte[]>> finished =
splitScan(outstanding, connection, tableName, splitAlgo);
if (finished.isEmpty()) {
Thread.sleep(30 * 1000);
} else {
outstanding.removeAll(finished);
for (Pair<byte[], byte[]> region : finished) {
splitOut.writeChars("- " + splitAlgo.rowToStr(region.getFirst()) + " "
+ splitAlgo.rowToStr(region.getSecond()) + "\n");
splitCount++;
}
LOG.debug("Finally " + finished.size() + " outstanding splits finished");
}
}
}
LOG.debug("All regions have been successfully split!");
} finally {
long tDiff = EnvironmentEdgeManager.currentTime() - startTime;
LOG.debug("TOTAL TIME = " + org.apache.hadoop.util.StringUtils.formatTime(tDiff));
LOG.debug("Splits = " + splitCount);
if (0 < splitCount) {
LOG.debug("Avg Time / Split = "
+ org.apache.hadoop.util.StringUtils.formatTime(tDiff / splitCount));
}
}
} finally {
splitOut.close();
fs.delete(splitFile, false);
}
}
}
}