in tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java [57:131]
public void run() {
try {
// TODO: TEPHRA-159 Start a read only transaction here
Transaction tx = txManager.startShort();
txManager.abort(tx);
if (tx.getInvalids().length == 0) {
LOG.info("Invalid list is empty, not running transaction pruning");
return;
}
long now = getTime();
long inactiveTransactionBound = TxUtils.getInactiveTxBound(now, txMaxLifetimeMillis + txPruneBufferMillis);
LOG.info("Starting invalid prune run for time {} and inactive transaction bound {}",
now, inactiveTransactionBound);
List<Long> pruneUpperBounds = new ArrayList<>();
for (Map.Entry<String, TransactionPruningPlugin> entry : plugins.entrySet()) {
String name = entry.getKey();
TransactionPruningPlugin plugin = entry.getValue();
try {
LOG.debug("Fetching prune upper bound using plugin {}", name);
long pruneUpperBound = plugin.fetchPruneUpperBound(now, inactiveTransactionBound);
LOG.debug("Got prune upper bound {} from plugin {}", pruneUpperBound, name);
pruneUpperBounds.add(pruneUpperBound);
} catch (Exception e) {
LOG.error("Aborting invalid prune run for time {} due to exception from plugin {}", now, name, e);
return;
}
}
long minPruneUpperBound = Collections.min(pruneUpperBounds);
LOG.info("Got minimum prune upper bound {} across all plugins", minPruneUpperBound);
if (minPruneUpperBound <= 0) {
LOG.info("Not pruning invalid list since minimum prune upper bound ({}) is less than 1", minPruneUpperBound);
return;
}
long[] invalids = tx.getInvalids();
TreeSet<Long> toTruncate = new TreeSet<>();
LOG.debug("Invalid list: {}", invalids);
for (long invalid : invalids) {
if (invalid <= minPruneUpperBound) {
toTruncate.add(invalid);
}
}
if (toTruncate.isEmpty()) {
LOG.info("Not pruning invalid list since the min prune upper bound {} is greater than the min invalid id {}",
minPruneUpperBound, invalids[0]);
return;
}
LOG.debug("Removing the following invalid ids from the invalid list", toTruncate);
txManager.truncateInvalidTx(toTruncate);
LOG.info("Removed {} invalid ids from the invalid list", toTruncate.size());
// Call prune complete on all plugins
Long maxPrunedInvalid = toTruncate.last();
for (Map.Entry<String, TransactionPruningPlugin> entry : plugins.entrySet()) {
String name = entry.getKey();
TransactionPruningPlugin plugin = entry.getValue();
try {
LOG.debug("Calling prune complete on plugin {}", name);
plugin.pruneComplete(now, maxPrunedInvalid);
} catch (Exception e) {
// Ignore any exceptions and continue with other plugins
LOG.error("Got error while calling prune complete on plugin {}", name, e);
}
}
LOG.info("Invalid prune run for time {} is complete", now);
} catch (Exception e) {
LOG.error("Got exception during invalid list prune run", e);
}
}