public void run()

in server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java [669:919]


  public void run() {

    try {
      compactorAddress = startCompactorClientService();
    } catch (UnknownHostException e1) {
      throw new RuntimeException("Failed to start the compactor client service", e1);
    }
    final HostAndPort clientAddress = compactorAddress.getAddress();

    try {
      announceExistence(clientAddress);
    } catch (KeeperException | InterruptedException e) {
      throw new RuntimeException("Error registering compactor in ZooKeeper", e);
    }
    this.getContext().setServiceLock(compactorLock);

    MetricsInfo metricsInfo = getContext().getMetricsInfo();

    metricsInfo.addMetricsProducers(this, pausedMetrics);
    metricsInfo.init(getServiceTags(clientAddress));

    var watcher = new CompactionWatcher(getConfiguration());
    var schedExecutor = getContext().getScheduledExecutor();

    startCancelChecker(schedExecutor,
        getConfiguration().getTimeInMillis(Property.COMPACTOR_CANCEL_CHECK_INTERVAL));

    LOG.info("Compactor started, waiting for work");
    try {

      final AtomicReference<Throwable> err = new AtomicReference<>();
      final LogSorter logSorter = new LogSorter(this);
      long nextSortLogsCheckTime = System.currentTimeMillis();

      while (!isShutdownRequested()) {
        if (Thread.currentThread().isInterrupted()) {
          LOG.info("Server process thread has been interrupted, shutting down");
          break;
        }
        try {
          // mark compactor as idle while not in the compaction loop
          updateIdleStatus(true);

          currentCompactionId.set(null);
          err.set(null);
          JOB_HOLDER.reset();

          if (System.currentTimeMillis() > nextSortLogsCheckTime) {
            // Attempt to process all existing log sorting work serially in this thread.
            // When no work remains, this call will return so that we can look for compaction
            // work.
            LOG.debug("Checking to see if any recovery logs need sorting");
            nextSortLogsCheckTime = logSorter.sortLogsIfNeeded();
          }

          TExternalCompactionJob job;
          try {
            TNextCompactionJob next = getNextJob(getNextId());
            job = next.getJob();
            if (!job.isSetExternalCompactionId()) {
              LOG.trace("No external compactions in queue {}", this.getResourceGroup());
              UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount()));
              continue;
            }
            if (!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) {
              throw new IllegalStateException("Returned eci " + job.getExternalCompactionId()
                  + " does not match supplied eci " + currentCompactionId.get());
            }
          } catch (RetriesExceededException e2) {
            LOG.warn("Retries exceeded getting next job. Retrying...");
            continue;
          }
          LOG.debug("Received next compaction job: {}", job);

          final LongAdder totalInputEntries = new LongAdder();
          final LongAdder totalInputBytes = new LongAdder();
          final CountDownLatch started = new CountDownLatch(1);
          final CountDownLatch stopped = new CountDownLatch(1);

          final FileCompactorRunnable fcr =
              createCompactionJob(job, totalInputEntries, totalInputBytes, started, stopped, err);

          final Thread compactionThread =
              Threads.createThread("Compaction job for tablet " + job.getExtent().toString(), fcr);

          JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor());

          try {
            // mark compactor as busy while compacting
            updateIdleStatus(false);

            // Need to call FileCompactorRunnable.initialize after calling JOB_HOLDER.set
            fcr.initialize();

            compactionThread.start(); // start the compactionThread
            started.await(); // wait until the compactor is started
            final long inputEntries = totalInputEntries.sum();
            final long waitTime = calculateProgressCheckTime(totalInputBytes.sum());
            LOG.debug("Progress checks will occur every {} seconds", waitTime);
            String percentComplete = "unknown";

            while (!stopped.await(waitTime, TimeUnit.SECONDS)) {
              List<CompactionInfo> running =
                  org.apache.accumulo.server.compaction.FileCompactor.getRunningCompactions();
              if (!running.isEmpty()) {
                // Compaction has started. There should only be one in the list
                CompactionInfo info = running.get(0);
                if (info != null) {
                  final long entriesRead = info.getEntriesRead();
                  final long entriesWritten = info.getEntriesWritten();
                  if (inputEntries > 0) {
                    percentComplete = Float.toString((entriesRead / (float) inputEntries) * 100);
                  }
                  String message = String.format(
                      "Compaction in progress, read %d of %d input entries ( %s %s ), written %d entries",
                      entriesRead, inputEntries, percentComplete, "%", entriesWritten);
                  watcher.run();
                  try {
                    LOG.debug("Updating coordinator with compaction progress: {}.", message);
                    TCompactionStatusUpdate update = new TCompactionStatusUpdate(
                        TCompactionState.IN_PROGRESS, message, inputEntries, entriesRead,
                        entriesWritten, fcr.getCompactionAge().toNanos());
                    updateCompactionState(job, update);
                  } catch (RetriesExceededException e) {
                    LOG.warn("Error updating coordinator with compaction progress, error: {}",
                        e.getMessage());
                  }
                }
              } else {
                LOG.debug("Waiting on compaction thread to finish, but no RUNNING compaction");
              }
            }
            compactionThread.join();
            LOG.trace("Compaction thread finished.");
            // Run the watcher again to clear out the finished compaction and set the
            // stuck count to zero.
            watcher.run();

            if (err.get() != null) {
              // maybe the error occured because the table was deleted or something like that, so
              // force a cancel check to possibly reduce noise in the logs
              checkIfCanceled();
            }

            if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled()
                || (err.get() != null && err.get().getClass().equals(InterruptedException.class))) {
              LOG.warn("Compaction thread was interrupted, sending CANCELLED state");
              try {
                TCompactionStatusUpdate update =
                    new TCompactionStatusUpdate(TCompactionState.CANCELLED, "Compaction cancelled",
                        -1, -1, -1, fcr.getCompactionAge().toNanos());
                updateCompactionState(job, update);
                updateCompactionFailed(job);
              } catch (RetriesExceededException e) {
                LOG.error("Error updating coordinator with compaction cancellation.", e);
              } finally {
                currentCompactionId.set(null);
              }
            } else if (err.get() != null) {
              KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent());
              try {
                LOG.info("Updating coordinator with compaction failure: id: {}, extent: {}",
                    job.getExternalCompactionId(), fromThriftExtent);
                TCompactionStatusUpdate update = new TCompactionStatusUpdate(
                    TCompactionState.FAILED, "Compaction failed due to: " + err.get().getMessage(),
                    -1, -1, -1, fcr.getCompactionAge().toNanos());
                updateCompactionState(job, update);
                updateCompactionFailed(job);
              } catch (RetriesExceededException e) {
                LOG.error("Error updating coordinator with compaction failure: id: {}, extent: {}",
                    job.getExternalCompactionId(), fromThriftExtent, e);
              } finally {
                currentCompactionId.set(null);
              }
            } else {
              try {
                LOG.trace("Updating coordinator with compaction completion.");
                updateCompactionCompleted(job, JOB_HOLDER.getStats());
              } catch (RetriesExceededException e) {
                LOG.error(
                    "Error updating coordinator with compaction completion, cancelling compaction.",
                    e);
                try {
                  cancel(job.getExternalCompactionId());
                } catch (TException e1) {
                  LOG.error("Error cancelling compaction.", e1);
                }
              } finally {
                currentCompactionId.set(null);
              }
            }
          } catch (RuntimeException e1) {
            LOG.error(
                "Compactor thread was interrupted waiting for compaction to start, cancelling job",
                e1);
            try {
              cancel(job.getExternalCompactionId());
            } catch (TException e2) {
              LOG.error("Error cancelling compaction.", e2);
            }
          } finally {
            currentCompactionId.set(null);

            // mark compactor as idle after compaction completes
            updateIdleStatus(true);

            // In the case where there is an error in the foreground code the background compaction
            // may still be running. Must cancel it before starting another iteration of the loop to
            // avoid multiple threads updating shared state.
            while (compactionThread.isAlive()) {
              compactionThread.interrupt();
              compactionThread.join(1000);
            }
          }
        } catch (InterruptedException e) {
          LOG.info("Interrupt Exception received, shutting down");
          gracefulShutdown(getContext().rpcCreds());
        }
      } // end while
    } catch (Exception e) {
      LOG.error("Unhandled error occurred in Compactor", e);
    } finally {
      // Shutdown local thrift server
      LOG.debug("Stopping Thrift Servers");
      if (compactorAddress.server != null) {
        compactorAddress.server.stop();
      }

      try {
        LOG.debug("Closing filesystems");
        VolumeManager mgr = getContext().getVolumeManager();
        if (null != mgr) {
          mgr.close();
        }
      } catch (IOException e) {
        LOG.warn("Failed to close filesystem : {}", e.getMessage(), e);
      }

      getContext().getLowMemoryDetector().logGCInfo(getConfiguration());
      getShutdownComplete().set(true);
      LOG.info("stop requested. exiting ... ");
      try {
        if (null != compactorLock) {
          compactorLock.unlock();
        }
      } catch (Exception e) {
        LOG.warn("Failed to release compactor lock", e);
      }
    }

  }