private long computePruneUpperBound()

in tephra-hbase-compat-2.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java [291:352]


  private long computePruneUpperBound(TimeRegions timeRegions) throws IOException {
    // Get the tables for the current time from the latest regions set
    final Set<TableName> existingTables = getTableNamesForRegions(timeRegions.getRegions());
    LOG.debug("Tables for time {} = {}", timeRegions.getTime(), existingTables);

    do {
      LOG.debug("Computing prune upper bound for {}", timeRegions);
      SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
      long time = timeRegions.getTime();

      long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
      LOG.debug("Got inactive transaction bound {}", inactiveTransactionBound);
      // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
      if (inactiveTransactionBound == -1) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
                      "and hence the data must be incomplete", time);
        }
        continue;
      }

      // Remove non-existing tables from the transactional regions set, so that we don't lookup prune upper bounds
      // for them. Since the deleted tables do not exist anymore, there is no need to make sure they have been
      // compacted. This ensures that transient tables do not block pruning progress.
      transactionalRegions = filterDeletedTableRegions(existingTables, transactionalRegions);
      if (LOG.isDebugEnabled()) {
        LOG.debug("Transactional regions after removing the regions of non-existing tables = {}",
                  Iterables.transform(transactionalRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
      }

      // Get the prune upper bounds for all the transactional regions
      Map<byte[], Long> pruneUpperBoundRegions =
        dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
      logPruneUpperBoundRegions(pruneUpperBoundRegions);

      // Use inactiveTransactionBound as the prune upper bound for the empty regions since the regions that are
      // recorded as empty after inactiveTransactionBoundTime will not have invalid data
      // for transactions started on or before inactiveTransactionBoundTime
      pruneUpperBoundRegions = handleEmptyRegions(inactiveTransactionBound, transactionalRegions,
                                                  pruneUpperBoundRegions);

      // If prune upper bounds are found for all the transactional regions, then compute the prune upper bound
      // across all regions
      if (!transactionalRegions.isEmpty() &&
        pruneUpperBoundRegions.size() == transactionalRegions.size()) {
        Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
        long pruneUpperBound = Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
        LOG.debug("Found prune upper bound {} for time {}", pruneUpperBound, time);
        return pruneUpperBound;
      } else {
        if (LOG.isDebugEnabled()) {
          Sets.SetView<byte[]> difference =
            Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
          LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}",
                    time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN));
        }
      }

      timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time - 1);
    } while (timeRegions != null);
    return -1;
  }