tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java [116:405]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@SuppressWarnings("WeakerAccess")
public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
  public static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionPruningPlugin.class);

  protected Configuration conf;
  protected Connection connection;
  protected DataJanitorState dataJanitorState;

  @Override
  public void initialize(Configuration conf) throws IOException {
    this.conf = conf;
    this.connection = ConnectionFactory.createConnection(conf);

    final TableName stateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
                                                            TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
    LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
    createPruneTable(stateTable);
    this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
      @Override
      public Table get() throws IOException {
        return connection.getTable(stateTable);
      }
    });
  }

  /**
   * Determines prune upper bound for the data store as mentioned above.
   */
  @Override
  public long fetchPruneUpperBound(long time, long inactiveTransactionBound) throws IOException {
    LOG.debug("Fetching prune upper bound for time {} and inactive transaction bound {}",
              time, inactiveTransactionBound);
    if (time < 0 || inactiveTransactionBound < 0) {
      return -1;
    }

    // Get all the current transactional regions
    SortedSet<byte[]> transactionalRegions = getTransactionalRegions();
    if (!transactionalRegions.isEmpty()) {
      LOG.debug("Saving {} transactional regions for time {}", transactionalRegions.size(), time);
      dataJanitorState.saveRegionsForTime(time, transactionalRegions);
      // Save inactive transaction bound for time as the final step.
      // We can then use its existence to make sure that the data for a given time is complete or not
      LOG.debug("Saving inactive transaction bound {} for time {}", inactiveTransactionBound, time);
      dataJanitorState.saveInactiveTransactionBoundForTime(time, inactiveTransactionBound);
    }

    return computePruneUpperBound(new TimeRegions(time, transactionalRegions));
  }

  /**
   * After invalid list has been pruned, this cleans up state information that is no longer required.
   * This includes -
   * <ul>
   *   <li>
   *     <i>(region, prune upper bound)</i> - prune upper bound for regions that are older
   *     than maxPrunedInvalid
   *   </li>
   *   <li>
   *     <i>(t, set of regions) - Regions set that were recorded on or before the start time
   *     of maxPrunedInvalid
   *   </li>
   *   <li>
   *     (t, inactive transaction bound) - Smallest not in-progress transaction without any writes in new regions
   *     information recorded on or before the start time of maxPrunedInvalid
   *   </li>
   * </ul>
   */
  @Override
  public void pruneComplete(long time, long maxPrunedInvalid) throws IOException {
    LOG.debug("Prune complete for time {} and prune upper bound {}", time, maxPrunedInvalid);
    if (time < 0 || maxPrunedInvalid < 0) {
      return;
    }

    // Get regions for the current time, so as to not delete the prune upper bounds for them.
    // The prune upper bounds for regions are recorded by TransactionProcessor and the deletion
    // is done by this class. To avoid update/delete race condition, we only delete prune upper
    // bounds for the stale regions.
    TimeRegions regionsToExclude = dataJanitorState.getRegionsOnOrBeforeTime(time);
    if (regionsToExclude != null) {
      LOG.debug("Deleting prune upper bounds smaller than {} for stale regions", maxPrunedInvalid);
      dataJanitorState.deletePruneUpperBounds(maxPrunedInvalid, regionsToExclude.getRegions());
    } else {
      LOG.warn("Cannot find saved regions on or before time {}", time);
    }
    long pruneTime = TxUtils.getTimestamp(maxPrunedInvalid);
    LOG.debug("Deleting regions recorded before time {}", pruneTime);
    dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime);
    LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime);
    dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime);
    LOG.debug("Deleting empty regions recorded on or before time {}", pruneTime);
    dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(pruneTime);
  }

  @Override
  public void destroy() {
    LOG.info("Stopping plugin...");
    try {
      connection.close();
    } catch (IOException e) {
      LOG.error("Got exception while closing HBase connection", e);
    }
  }

  /**
   * Create the prune state table given the {@link TableName} if the table doesn't exist already.
   *
   * @param stateTable prune state table name
   */
  protected void createPruneTable(TableName stateTable) throws IOException {
    try (Admin admin = this.connection.getAdmin()) {
      if (admin.tableExists(stateTable)) {
        LOG.debug("Not creating pruneStateTable {} since it already exists.",
                  stateTable.getNameWithNamespaceInclAsString());
        return;
      }

      HTableDescriptor htd = new HTableDescriptor(stateTable);
      htd.addFamily(new HColumnDescriptor(DataJanitorState.FAMILY).setMaxVersions(1));
      admin.createTable(htd);
      LOG.info("Created pruneTable {}", stateTable.getNameWithNamespaceInclAsString());
    } catch (TableExistsException ex) {
      // Expected if the prune state table is being created at the same time by another client
      LOG.debug("Not creating pruneStateTable {} since it already exists.",
                stateTable.getNameWithNamespaceInclAsString(), ex);
    }
  }

  /**
   * Returns whether the table is a transactional table. By default, it is a table is identified as a transactional
   * table if it has a the coprocessor {@link TransactionProcessor} attached to it. Should be overriden if the users
   * attach a different coprocessor.
   *
   * @param tableDescriptor {@link HTableDescriptor} of the table
   * @return true if the table is transactional
   */
  protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
    return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName());
  }

  protected SortedSet<byte[]> getTransactionalRegions() throws IOException {
    SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
    try (Admin admin = connection.getAdmin()) {
      HTableDescriptor[] tableDescriptors = admin.listTables();
      LOG.debug("Got {} tables to process", tableDescriptors == null ? 0 : tableDescriptors.length);
      if (tableDescriptors != null) {
        for (HTableDescriptor tableDescriptor : tableDescriptors) {
          if (isTransactionalTable(tableDescriptor)) {
            List<HRegionInfo> tableRegions = admin.getTableRegions(tableDescriptor.getTableName());
            LOG.debug("Regions for table {}: {}", tableDescriptor.getTableName(), tableRegions);
            if (tableRegions != null) {
              for (HRegionInfo region : tableRegions) {
                regions.add(region.getRegionName());
              }
            }
          } else {
            LOG.debug("{} is not a transactional table", tableDescriptor.getTableName());
          }
        }
      }
    }
    return regions;
  }

  /**
   * Try to find the latest set of regions in which all regions have been major compacted, and
   * compute prune upper bound from them. Starting from newest to oldest, this looks into the
   * region set that has been saved periodically, and joins it with the prune upper bound data
   * for a region recorded after a major compaction.
   *
   * @param timeRegions the latest set of regions
   * @return prune upper bound
   * @throws IOException when not able to talk to HBase
   */
  private long computePruneUpperBound(TimeRegions timeRegions) throws IOException {
    // Get the tables for the current time from the latest regions set
    final Set<TableName> existingTables = getTableNamesForRegions(timeRegions.getRegions());
    LOG.debug("Tables for time {} = {}", timeRegions.getTime(), existingTables);

    do {
      LOG.debug("Computing prune upper bound for {}", timeRegions);
      SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
      long time = timeRegions.getTime();

      long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
      LOG.debug("Got inactive transaction bound {}", inactiveTransactionBound);
      // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
      if (inactiveTransactionBound == -1) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
                      "and hence the data must be incomplete", time);
        }
        continue;
      }

      // Remove non-existing tables from the transactional regions set, so that we don't lookup prune upper bounds
      // for them. Since the deleted tables do not exist anymore, there is no need to make sure they have been
      // compacted. This ensures that transient tables do not block pruning progress.
      transactionalRegions = filterDeletedTableRegions(existingTables, transactionalRegions);
      if (LOG.isDebugEnabled()) {
        LOG.debug("Transactional regions after removing the regions of non-existing tables = {}",
                  Iterables.transform(transactionalRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
      }

      // Get the prune upper bounds for all the transactional regions
      Map<byte[], Long> pruneUpperBoundRegions =
        dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
      logPruneUpperBoundRegions(pruneUpperBoundRegions);

      // Use inactiveTransactionBound as the prune upper bound for the empty regions since the regions that are
      // recorded as empty after inactiveTransactionBoundTime will not have invalid data
      // for transactions started on or before inactiveTransactionBoundTime
      pruneUpperBoundRegions = handleEmptyRegions(inactiveTransactionBound, transactionalRegions,
                                                  pruneUpperBoundRegions);

      // If prune upper bounds are found for all the transactional regions, then compute the prune upper bound
      // across all regions
      if (!transactionalRegions.isEmpty() &&
        pruneUpperBoundRegions.size() == transactionalRegions.size()) {
        Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
        long pruneUpperBound = Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
        LOG.debug("Found prune upper bound {} for time {}", pruneUpperBound, time);
        return pruneUpperBound;
      } else {
        if (LOG.isDebugEnabled()) {
          Sets.SetView<byte[]> difference =
            Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
          LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}",
                    time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN));
        }
      }

      timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time - 1);
    } while (timeRegions != null);
    return -1;
  }

  private SortedSet<byte[]> filterDeletedTableRegions(final Set<TableName> existingTables,
                                                      SortedSet<byte[]> transactionalRegions) {
    return Sets.filter(transactionalRegions,
                       new Predicate<byte[]>() {
                         @Override
                         public boolean apply(byte[] region) {
                           return existingTables.contains(HRegionInfo.getTable(region));
                         }
                       });
  }

  private Set<TableName> getTableNamesForRegions(Set<byte[]> regions) {
    Set<TableName> tableNames = new HashSet<>(regions.size());
    for (byte[] region : regions) {
      tableNames.add(HRegionInfo.getTable(region));
    }
    return tableNames;
  }

  private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound,
                                               SortedSet<byte[]> transactionalRegions,
                                               Map<byte[], Long> pruneUpperBoundRegions) throws IOException {
    long inactiveTransactionBoundTime = TxUtils.getTimestamp(inactiveTransactionBound);
    SortedSet<byte[]> emptyRegions =
      dataJanitorState.getEmptyRegionsAfterTime(inactiveTransactionBoundTime, transactionalRegions);
    LOG.debug("Got empty transactional regions for inactive transaction bound time {}: {}",
              inactiveTransactionBoundTime, Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));

    // The regions that are recorded as empty after inactiveTransactionBoundTime will not have invalid data
    // for transactions started before or on inactiveTransactionBoundTime. Hence we can consider the prune upper bound
    // for these empty regions as inactiveTransactionBound
    Map<byte[], Long> pubWithEmptyRegions = new TreeMap<>(Bytes.BYTES_COMPARATOR);
    pubWithEmptyRegions.putAll(pruneUpperBoundRegions);
    for (byte[] emptyRegion : emptyRegions) {
      if (!pruneUpperBoundRegions.containsKey(emptyRegion)) {
        pubWithEmptyRegions.put(emptyRegion, inactiveTransactionBound);
      }
    }
    return Collections.unmodifiableMap(pubWithEmptyRegions);
  }

  private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Got region - prune upper bound map: {}",
                Iterables.transform(pruneUpperBoundRegions.entrySet(),
                                    new Function<Map.Entry<byte[], Long>, Map.Entry<String, Long>>() {
                                      @Override
                                      public Map.Entry<String, Long> apply(Map.Entry<byte[], Long> input) {
                                        String regionName = TimeRegions.BYTE_ARR_TO_STRING_FN.apply(input.getKey());
                                        return Maps.immutableEntry(regionName, input.getValue());
                                      }
                                    }));
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



tephra-hbase-compat-2.4/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java [116:405]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@SuppressWarnings("WeakerAccess")
public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin {
  public static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionPruningPlugin.class);

  protected Configuration conf;
  protected Connection connection;
  protected DataJanitorState dataJanitorState;

  @Override
  public void initialize(Configuration conf) throws IOException {
    this.conf = conf;
    this.connection = ConnectionFactory.createConnection(conf);

    final TableName stateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
                                                            TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
    LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString());
    createPruneTable(stateTable);
    this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() {
      @Override
      public Table get() throws IOException {
        return connection.getTable(stateTable);
      }
    });
  }

  /**
   * Determines prune upper bound for the data store as mentioned above.
   */
  @Override
  public long fetchPruneUpperBound(long time, long inactiveTransactionBound) throws IOException {
    LOG.debug("Fetching prune upper bound for time {} and inactive transaction bound {}",
              time, inactiveTransactionBound);
    if (time < 0 || inactiveTransactionBound < 0) {
      return -1;
    }

    // Get all the current transactional regions
    SortedSet<byte[]> transactionalRegions = getTransactionalRegions();
    if (!transactionalRegions.isEmpty()) {
      LOG.debug("Saving {} transactional regions for time {}", transactionalRegions.size(), time);
      dataJanitorState.saveRegionsForTime(time, transactionalRegions);
      // Save inactive transaction bound for time as the final step.
      // We can then use its existence to make sure that the data for a given time is complete or not
      LOG.debug("Saving inactive transaction bound {} for time {}", inactiveTransactionBound, time);
      dataJanitorState.saveInactiveTransactionBoundForTime(time, inactiveTransactionBound);
    }

    return computePruneUpperBound(new TimeRegions(time, transactionalRegions));
  }

  /**
   * After invalid list has been pruned, this cleans up state information that is no longer required.
   * This includes -
   * <ul>
   *   <li>
   *     <i>(region, prune upper bound)</i> - prune upper bound for regions that are older
   *     than maxPrunedInvalid
   *   </li>
   *   <li>
   *     <i>(t, set of regions) - Regions set that were recorded on or before the start time
   *     of maxPrunedInvalid
   *   </li>
   *   <li>
   *     (t, inactive transaction bound) - Smallest not in-progress transaction without any writes in new regions
   *     information recorded on or before the start time of maxPrunedInvalid
   *   </li>
   * </ul>
   */
  @Override
  public void pruneComplete(long time, long maxPrunedInvalid) throws IOException {
    LOG.debug("Prune complete for time {} and prune upper bound {}", time, maxPrunedInvalid);
    if (time < 0 || maxPrunedInvalid < 0) {
      return;
    }

    // Get regions for the current time, so as to not delete the prune upper bounds for them.
    // The prune upper bounds for regions are recorded by TransactionProcessor and the deletion
    // is done by this class. To avoid update/delete race condition, we only delete prune upper
    // bounds for the stale regions.
    TimeRegions regionsToExclude = dataJanitorState.getRegionsOnOrBeforeTime(time);
    if (regionsToExclude != null) {
      LOG.debug("Deleting prune upper bounds smaller than {} for stale regions", maxPrunedInvalid);
      dataJanitorState.deletePruneUpperBounds(maxPrunedInvalid, regionsToExclude.getRegions());
    } else {
      LOG.warn("Cannot find saved regions on or before time {}", time);
    }
    long pruneTime = TxUtils.getTimestamp(maxPrunedInvalid);
    LOG.debug("Deleting regions recorded before time {}", pruneTime);
    dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime);
    LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime);
    dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime);
    LOG.debug("Deleting empty regions recorded on or before time {}", pruneTime);
    dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(pruneTime);
  }

  @Override
  public void destroy() {
    LOG.info("Stopping plugin...");
    try {
      connection.close();
    } catch (IOException e) {
      LOG.error("Got exception while closing HBase connection", e);
    }
  }

  /**
   * Create the prune state table given the {@link TableName} if the table doesn't exist already.
   *
   * @param stateTable prune state table name
   */
  protected void createPruneTable(TableName stateTable) throws IOException {
    try (Admin admin = this.connection.getAdmin()) {
      if (admin.tableExists(stateTable)) {
        LOG.debug("Not creating pruneStateTable {} since it already exists.",
                  stateTable.getNameWithNamespaceInclAsString());
        return;
      }

      HTableDescriptor htd = new HTableDescriptor(stateTable);
      htd.addFamily(new HColumnDescriptor(DataJanitorState.FAMILY).setMaxVersions(1));
      admin.createTable(htd);
      LOG.info("Created pruneTable {}", stateTable.getNameWithNamespaceInclAsString());
    } catch (TableExistsException ex) {
      // Expected if the prune state table is being created at the same time by another client
      LOG.debug("Not creating pruneStateTable {} since it already exists.",
                stateTable.getNameWithNamespaceInclAsString(), ex);
    }
  }

  /**
   * Returns whether the table is a transactional table. By default, it is a table is identified as a transactional
   * table if it has a the coprocessor {@link TransactionProcessor} attached to it. Should be overriden if the users
   * attach a different coprocessor.
   *
   * @param tableDescriptor {@link HTableDescriptor} of the table
   * @return true if the table is transactional
   */
  protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
    return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName());
  }

  protected SortedSet<byte[]> getTransactionalRegions() throws IOException {
    SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
    try (Admin admin = connection.getAdmin()) {
      HTableDescriptor[] tableDescriptors = admin.listTables();
      LOG.debug("Got {} tables to process", tableDescriptors == null ? 0 : tableDescriptors.length);
      if (tableDescriptors != null) {
        for (HTableDescriptor tableDescriptor : tableDescriptors) {
          if (isTransactionalTable(tableDescriptor)) {
            List<HRegionInfo> tableRegions = admin.getTableRegions(tableDescriptor.getTableName());
            LOG.debug("Regions for table {}: {}", tableDescriptor.getTableName(), tableRegions);
            if (tableRegions != null) {
              for (HRegionInfo region : tableRegions) {
                regions.add(region.getRegionName());
              }
            }
          } else {
            LOG.debug("{} is not a transactional table", tableDescriptor.getTableName());
          }
        }
      }
    }
    return regions;
  }

  /**
   * Try to find the latest set of regions in which all regions have been major compacted, and
   * compute prune upper bound from them. Starting from newest to oldest, this looks into the
   * region set that has been saved periodically, and joins it with the prune upper bound data
   * for a region recorded after a major compaction.
   *
   * @param timeRegions the latest set of regions
   * @return prune upper bound
   * @throws IOException when not able to talk to HBase
   */
  private long computePruneUpperBound(TimeRegions timeRegions) throws IOException {
    // Get the tables for the current time from the latest regions set
    final Set<TableName> existingTables = getTableNamesForRegions(timeRegions.getRegions());
    LOG.debug("Tables for time {} = {}", timeRegions.getTime(), existingTables);

    do {
      LOG.debug("Computing prune upper bound for {}", timeRegions);
      SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
      long time = timeRegions.getTime();

      long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time);
      LOG.debug("Got inactive transaction bound {}", inactiveTransactionBound);
      // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions
      if (inactiveTransactionBound == -1) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " +
                      "and hence the data must be incomplete", time);
        }
        continue;
      }

      // Remove non-existing tables from the transactional regions set, so that we don't lookup prune upper bounds
      // for them. Since the deleted tables do not exist anymore, there is no need to make sure they have been
      // compacted. This ensures that transient tables do not block pruning progress.
      transactionalRegions = filterDeletedTableRegions(existingTables, transactionalRegions);
      if (LOG.isDebugEnabled()) {
        LOG.debug("Transactional regions after removing the regions of non-existing tables = {}",
                  Iterables.transform(transactionalRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));
      }

      // Get the prune upper bounds for all the transactional regions
      Map<byte[], Long> pruneUpperBoundRegions =
        dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions);
      logPruneUpperBoundRegions(pruneUpperBoundRegions);

      // Use inactiveTransactionBound as the prune upper bound for the empty regions since the regions that are
      // recorded as empty after inactiveTransactionBoundTime will not have invalid data
      // for transactions started on or before inactiveTransactionBoundTime
      pruneUpperBoundRegions = handleEmptyRegions(inactiveTransactionBound, transactionalRegions,
                                                  pruneUpperBoundRegions);

      // If prune upper bounds are found for all the transactional regions, then compute the prune upper bound
      // across all regions
      if (!transactionalRegions.isEmpty() &&
        pruneUpperBoundRegions.size() == transactionalRegions.size()) {
        Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values());
        long pruneUpperBound = Math.min(inactiveTransactionBound, minPruneUpperBoundRegions);
        LOG.debug("Found prune upper bound {} for time {}", pruneUpperBound, time);
        return pruneUpperBound;
      } else {
        if (LOG.isDebugEnabled()) {
          Sets.SetView<byte[]> difference =
            Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet());
          LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}",
                    time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN));
        }
      }

      timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time - 1);
    } while (timeRegions != null);
    return -1;
  }

  private SortedSet<byte[]> filterDeletedTableRegions(final Set<TableName> existingTables,
                                                      SortedSet<byte[]> transactionalRegions) {
    return Sets.filter(transactionalRegions,
                       new Predicate<byte[]>() {
                         @Override
                         public boolean apply(byte[] region) {
                           return existingTables.contains(HRegionInfo.getTable(region));
                         }
                       });
  }

  private Set<TableName> getTableNamesForRegions(Set<byte[]> regions) {
    Set<TableName> tableNames = new HashSet<>(regions.size());
    for (byte[] region : regions) {
      tableNames.add(HRegionInfo.getTable(region));
    }
    return tableNames;
  }

  private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound,
                                               SortedSet<byte[]> transactionalRegions,
                                               Map<byte[], Long> pruneUpperBoundRegions) throws IOException {
    long inactiveTransactionBoundTime = TxUtils.getTimestamp(inactiveTransactionBound);
    SortedSet<byte[]> emptyRegions =
      dataJanitorState.getEmptyRegionsAfterTime(inactiveTransactionBoundTime, transactionalRegions);
    LOG.debug("Got empty transactional regions for inactive transaction bound time {}: {}",
              inactiveTransactionBoundTime, Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN));

    // The regions that are recorded as empty after inactiveTransactionBoundTime will not have invalid data
    // for transactions started before or on inactiveTransactionBoundTime. Hence we can consider the prune upper bound
    // for these empty regions as inactiveTransactionBound
    Map<byte[], Long> pubWithEmptyRegions = new TreeMap<>(Bytes.BYTES_COMPARATOR);
    pubWithEmptyRegions.putAll(pruneUpperBoundRegions);
    for (byte[] emptyRegion : emptyRegions) {
      if (!pruneUpperBoundRegions.containsKey(emptyRegion)) {
        pubWithEmptyRegions.put(emptyRegion, inactiveTransactionBound);
      }
    }
    return Collections.unmodifiableMap(pubWithEmptyRegions);
  }

  private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Got region - prune upper bound map: {}",
                Iterables.transform(pruneUpperBoundRegions.entrySet(),
                                    new Function<Map.Entry<byte[], Long>, Map.Entry<String, Long>>() {
                                      @Override
                                      public Map.Entry<String, Long> apply(Map.Entry<byte[], Long> input) {
                                        String regionName = TimeRegions.BYTE_ARR_TO_STRING_FN.apply(input.getKey());
                                        return Maps.immutableEntry(regionName, input.getValue());
                                      }
                                    }));
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



