private void startFlushThread()

in tephra-hbase-compat-2.4/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java [104:156]


  private void startFlushThread() {
    flushThread = new Thread("tephra-prune-upper-bound-writer") {
      @Override
      public void run() {
        while ((!isInterrupted()) && (!stopped)) {
          long now = System.currentTimeMillis();
          if (now > (lastChecked + pruneFlushInterval)) {
            // should flush data
            try {
              User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
                @Override
                public Void run() throws Exception {
                  // Record prune upper bound
                  while (!pruneEntries.isEmpty()) {
                    Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry();
                    dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue());
                    // We can now remove the entry only if the key and value match with what we wrote since it is
                    // possible that a new pruneUpperBound for the same key has been added
                    pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue());
                  }
                  // Record empty regions
                  while (!emptyRegions.isEmpty()) {
                    Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry();
                    dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey());
                    // We can now remove the entry only if the key and value match with what we wrote since it is
                    // possible that a new value for the same key has been added
                    emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue());
                  }
                  return null;
                }
              });
            } catch (IOException ex) {
              LOG.warn("Cannot record prune upper bound for a region to table " +
                         tableName.getNameWithNamespaceInclAsString(), ex);
            }
            lastChecked = now;
          }

          try {
            TimeUnit.SECONDS.sleep(1);
          } catch (InterruptedException ex) {
            interrupt();
            break;
          }
        }

        LOG.info("PruneUpperBound Writer thread terminated.");
      }
    };

    flushThread.setDaemon(true);
    flushThread.start();
  }