static void rollingSplit()

in hbase-server/src/main/java/org/apache/hadoop/hbase/util/RegionSplitter.java [400:594]


  static void rollingSplit(TableName tableName, SplitAlgorithm splitAlgo, Configuration conf)
    throws IOException, InterruptedException {
    final int minOS = conf.getInt("split.outstanding", 2);
    try (Connection connection = ConnectionFactory.createConnection(conf)) {
      // Max outstanding splits. default == 50% of servers
      final int MAX_OUTSTANDING = Math.max(getRegionServerCount(connection) / 2, minOS);

      Path hbDir = CommonFSUtils.getRootDir(conf);
      Path tableDir = CommonFSUtils.getTableDir(hbDir, tableName);
      Path splitFile = new Path(tableDir, "_balancedSplit");
      FileSystem fs = FileSystem.get(conf);

      // Get a list of daughter regions to create
      LinkedList<Pair<byte[], byte[]>> tmpRegionSet = null;
      try (Table table = connection.getTable(tableName)) {
        tmpRegionSet = getSplits(connection, tableName, splitAlgo);
      }
      LinkedList<Pair<byte[], byte[]>> outstanding = Lists.newLinkedList();
      int splitCount = 0;
      final int origCount = tmpRegionSet.size();

      // all splits must compact & we have 1 compact thread, so 2 split
      // requests to the same RS can stall the outstanding split queue.
      // To fix, group the regions into an RS pool and round-robin through it
      LOG.debug("Bucketing regions by regionserver...");
      TreeMap<ServerName, LinkedList<Pair<byte[], byte[]>>> daughterRegions = Maps.newTreeMap();
      // Get a regionLocator. Need it in below.
      try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
        for (Pair<byte[], byte[]> dr : tmpRegionSet) {
          ServerName rsLocation = regionLocator.getRegionLocation(dr.getSecond()).getServerName();
          if (!daughterRegions.containsKey(rsLocation)) {
            LinkedList<Pair<byte[], byte[]>> entry = Lists.newLinkedList();
            daughterRegions.put(rsLocation, entry);
          }
          daughterRegions.get(rsLocation).add(dr);
        }
        LOG.debug("Done with bucketing.  Split time!");
        long startTime = EnvironmentEdgeManager.currentTime();

        // Open the split file and modify it as splits finish
        byte[] rawData = readFile(fs, splitFile);

        FSDataOutputStream splitOut = fs.create(splitFile);
        try {
          splitOut.write(rawData);

          try {
            // *** split code ***
            while (!daughterRegions.isEmpty()) {
              LOG.debug(daughterRegions.size() + " RS have regions to splt.");

              // Get ServerName to region count mapping
              final TreeMap<ServerName, Integer> rsSizes = Maps.newTreeMap();
              List<HRegionLocation> hrls = regionLocator.getAllRegionLocations();
              for (HRegionLocation hrl : hrls) {
                ServerName sn = hrl.getServerName();
                if (rsSizes.containsKey(sn)) {
                  rsSizes.put(sn, rsSizes.get(sn) + 1);
                } else {
                  rsSizes.put(sn, 1);
                }
              }

              // Round-robin through the ServerName list. Choose the lightest-loaded servers
              // first to keep the master from load-balancing regions as we split.
              for (Map.Entry<ServerName,
                LinkedList<Pair<byte[], byte[]>>> daughterRegion : daughterRegions.entrySet()) {
                Pair<byte[], byte[]> dr = null;
                ServerName rsLoc = daughterRegion.getKey();
                LinkedList<Pair<byte[], byte[]>> regionList = daughterRegion.getValue();

                // Find a region in the ServerName list that hasn't been moved
                LOG.debug("Finding a region on " + rsLoc);
                while (!regionList.isEmpty()) {
                  dr = regionList.pop();

                  // get current region info
                  byte[] split = dr.getSecond();
                  HRegionLocation regionLoc = regionLocator.getRegionLocation(split);

                  // if this region moved locations
                  ServerName newRs = regionLoc.getServerName();
                  if (newRs.compareTo(rsLoc) != 0) {
                    LOG.debug("Region with " + splitAlgo.rowToStr(split) + " moved to " + newRs
                      + ". Relocating...");
                    // relocate it, don't use it right now
                    if (!daughterRegions.containsKey(newRs)) {
                      LinkedList<Pair<byte[], byte[]>> entry = Lists.newLinkedList();
                      daughterRegions.put(newRs, entry);
                    }
                    daughterRegions.get(newRs).add(dr);
                    dr = null;
                    continue;
                  }

                  // make sure this region wasn't already split
                  byte[] sk = regionLoc.getRegion().getStartKey();
                  if (sk.length != 0) {
                    if (Bytes.equals(split, sk)) {
                      LOG.debug("Region already split on " + splitAlgo.rowToStr(split)
                        + ".  Skipping this region...");
                      ++splitCount;
                      dr = null;
                      continue;
                    }
                    byte[] start = dr.getFirst();
                    Preconditions.checkArgument(Bytes.equals(start, sk),
                      splitAlgo.rowToStr(start) + " != " + splitAlgo.rowToStr(sk));
                  }

                  // passed all checks! found a good region
                  break;
                }
                if (regionList.isEmpty()) {
                  daughterRegions.remove(rsLoc);
                }
                if (dr == null) continue;

                // we have a good region, time to split!
                byte[] split = dr.getSecond();
                LOG.debug("Splitting at " + splitAlgo.rowToStr(split));
                try (Admin admin = connection.getAdmin()) {
                  admin.split(tableName, split);
                }

                LinkedList<Pair<byte[], byte[]>> finished = Lists.newLinkedList();
                LinkedList<Pair<byte[], byte[]>> local_finished = Lists.newLinkedList();
                if (conf.getBoolean("split.verify", true)) {
                  // we need to verify and rate-limit our splits
                  outstanding.addLast(dr);
                  // with too many outstanding splits, wait for some to finish
                  while (outstanding.size() >= MAX_OUTSTANDING) {
                    LOG.debug("Wait for outstanding splits " + outstanding.size());
                    local_finished = splitScan(outstanding, connection, tableName, splitAlgo);
                    if (local_finished.isEmpty()) {
                      Thread.sleep(30 * 1000);
                    } else {
                      finished.addAll(local_finished);
                      outstanding.removeAll(local_finished);
                      LOG.debug(local_finished.size() + " outstanding splits finished");
                    }
                  }
                } else {
                  finished.add(dr);
                }

                // mark each finished region as successfully split.
                for (Pair<byte[], byte[]> region : finished) {
                  splitOut.writeChars("- " + splitAlgo.rowToStr(region.getFirst()) + " "
                    + splitAlgo.rowToStr(region.getSecond()) + "\n");
                  splitCount++;
                  if (splitCount % 10 == 0) {
                    long tDiff = (EnvironmentEdgeManager.currentTime() - startTime) / splitCount;
                    LOG.debug(
                      "STATUS UPDATE: " + splitCount + " / " + origCount + ". Avg Time / Split = "
                        + org.apache.hadoop.util.StringUtils.formatTime(tDiff));
                  }
                }
              }
            }
            if (conf.getBoolean("split.verify", true)) {
              while (!outstanding.isEmpty()) {
                LOG.debug("Finally Wait for outstanding splits " + outstanding.size());
                LinkedList<Pair<byte[], byte[]>> finished =
                  splitScan(outstanding, connection, tableName, splitAlgo);
                if (finished.isEmpty()) {
                  Thread.sleep(30 * 1000);
                } else {
                  outstanding.removeAll(finished);
                  for (Pair<byte[], byte[]> region : finished) {
                    splitOut.writeChars("- " + splitAlgo.rowToStr(region.getFirst()) + " "
                      + splitAlgo.rowToStr(region.getSecond()) + "\n");
                    splitCount++;
                  }
                  LOG.debug("Finally " + finished.size() + " outstanding splits finished");
                }
              }
            }
            LOG.debug("All regions have been successfully split!");
          } finally {
            long tDiff = EnvironmentEdgeManager.currentTime() - startTime;
            LOG.debug("TOTAL TIME = " + org.apache.hadoop.util.StringUtils.formatTime(tDiff));
            LOG.debug("Splits = " + splitCount);
            if (0 < splitCount) {
              LOG.debug("Avg Time / Split = "
                + org.apache.hadoop.util.StringUtils.formatTime(tDiff / splitCount));
            }
          }
        } finally {
          splitOut.close();
          fs.delete(splitFile, false);
        }
      }
    }
  }