in tephra-hbase-compat-2.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java [104:156]
private void startFlushThread() {
flushThread = new Thread("tephra-prune-upper-bound-writer") {
@Override
public void run() {
while ((!isInterrupted()) && (!stopped)) {
long now = System.currentTimeMillis();
if (now > (lastChecked + pruneFlushInterval)) {
// should flush data
try {
User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
// Record prune upper bound
while (!pruneEntries.isEmpty()) {
Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
// We can now remove the entry only if the key and value match with what we wrote since it is
// possible that a new pruneUpperBound for the same key has been added
pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
}
// Record empty regions
while (!emptyRegions.isEmpty()) {
Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
// We can now remove the entry only if the key and value match with what we wrote since it is
// possible that a new value for the same key has been added
emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
}
return null;
}
});
} catch (IOException ex) {
LOG.warn("Cannot record prune upper bound for a region to table " +
tableName.getNameWithNamespaceInclAsString(), ex);
}
lastChecked = now;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ex) {
interrupt();
break;
}
}
LOG.info("PruneUpperBound Writer thread terminated.");
}
};
flushThread.setDaemon(true);
flushThread.start();
}