in tephra-hbase-compat-2.0-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java [291:352]
private long computePruneUpperBound(TimeRegions timeRegions) throws IOException {
// Get the tables for the current time from the latest regions set
final Set<TableName> existingTables = getTableNamesForRegions(timeRegions.getRegions());
LOG.debug("Tables for time {} = {}", timeRegions.getTime(), existingTables);
do {
LOG.debug("Computing prune upper bound for {}", timeRegions);
SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
long time = timeRegions.getTime();
long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
LOG.debug("Got inactive transaction bound {}", inactiveTransactionBound);
// If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
if (inactiveTransactionBound == -1) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
"and hence the data must be incomplete", time);
}
continue;
}
// Remove non-existing tables from the transactional regions set, so that we don't lookup prune upper bounds
// for them. Since the deleted tables do not exist anymore, there is no need to make sure they have been
// compacted. This ensures that transient tables do not block pruning progress.
transactionalRegions = filterDeletedTableRegions(existingTables, transactionalRegions);
if (LOG.isDebugEnabled()) {
LOG.debug("Transactional regions after removing the regions of non-existing tables = {}",
Iterables.transform(transactionalRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
}
// Get the prune upper bounds for all the transactional regions
Map<byte[], Long> pruneUpperBoundRegions =
dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
logPruneUpperBoundRegions(pruneUpperBoundRegions);
// Use inactiveTransactionBound as the prune upper bound for the empty regions since the regions that are
// recorded as empty after inactiveTransactionBoundTime will not have invalid data
// for transactions started on or before inactiveTransactionBoundTime
pruneUpperBoundRegions = handleEmptyRegions(inactiveTransactionBound, transactionalRegions,
pruneUpperBoundRegions);
// If prune upper bounds are found for all the transactional regions, then compute the prune upper bound
// across all regions
if (!transactionalRegions.isEmpty() &&
pruneUpperBoundRegions.size() == transactionalRegions.size()) {
Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
long pruneUpperBound = Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
LOG.debug("Found prune upper bound {} for time {}", pruneUpperBound, time);
return pruneUpperBound;
} else {
if (LOG.isDebugEnabled()) {
Sets.SetView<byte[]> difference =
Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}",
time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN));
}
}
timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time - 1);
} while (timeRegions != null);
return -1;
}