in server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java [669:919]
public void run() {
try {
compactorAddress = startCompactorClientService();
} catch (UnknownHostException e1) {
throw new RuntimeException("Failed to start the compactor client service", e1);
}
final HostAndPort clientAddress = compactorAddress.getAddress();
try {
announceExistence(clientAddress);
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException("Error registering compactor in ZooKeeper", e);
}
this.getContext().setServiceLock(compactorLock);
MetricsInfo metricsInfo = getContext().getMetricsInfo();
metricsInfo.addMetricsProducers(this, pausedMetrics);
metricsInfo.init(getServiceTags(clientAddress));
var watcher = new CompactionWatcher(getConfiguration());
var schedExecutor = getContext().getScheduledExecutor();
startCancelChecker(schedExecutor,
getConfiguration().getTimeInMillis(Property.COMPACTOR_CANCEL_CHECK_INTERVAL));
LOG.info("Compactor started, waiting for work");
try {
final AtomicReference<Throwable> err = new AtomicReference<>();
final LogSorter logSorter = new LogSorter(this);
long nextSortLogsCheckTime = System.currentTimeMillis();
while (!isShutdownRequested()) {
if (Thread.currentThread().isInterrupted()) {
LOG.info("Server process thread has been interrupted, shutting down");
break;
}
try {
// mark compactor as idle while not in the compaction loop
updateIdleStatus(true);
currentCompactionId.set(null);
err.set(null);
JOB_HOLDER.reset();
if (System.currentTimeMillis() > nextSortLogsCheckTime) {
// Attempt to process all existing log sorting work serially in this thread.
// When no work remains, this call will return so that we can look for compaction
// work.
LOG.debug("Checking to see if any recovery logs need sorting");
nextSortLogsCheckTime = logSorter.sortLogsIfNeeded();
}
TExternalCompactionJob job;
try {
TNextCompactionJob next = getNextJob(getNextId());
job = next.getJob();
if (!job.isSetExternalCompactionId()) {
LOG.trace("No external compactions in queue {}", this.getResourceGroup());
UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount()));
continue;
}
if (!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) {
throw new IllegalStateException("Returned eci " + job.getExternalCompactionId()
+ " does not match supplied eci " + currentCompactionId.get());
}
} catch (RetriesExceededException e2) {
LOG.warn("Retries exceeded getting next job. Retrying...");
continue;
}
LOG.debug("Received next compaction job: {}", job);
final LongAdder totalInputEntries = new LongAdder();
final LongAdder totalInputBytes = new LongAdder();
final CountDownLatch started = new CountDownLatch(1);
final CountDownLatch stopped = new CountDownLatch(1);
final FileCompactorRunnable fcr =
createCompactionJob(job, totalInputEntries, totalInputBytes, started, stopped, err);
final Thread compactionThread =
Threads.createThread("Compaction job for tablet " + job.getExtent().toString(), fcr);
JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor());
try {
// mark compactor as busy while compacting
updateIdleStatus(false);
// Need to call FileCompactorRunnable.initialize after calling JOB_HOLDER.set
fcr.initialize();
compactionThread.start(); // start the compactionThread
started.await(); // wait until the compactor is started
final long inputEntries = totalInputEntries.sum();
final long waitTime = calculateProgressCheckTime(totalInputBytes.sum());
LOG.debug("Progress checks will occur every {} seconds", waitTime);
String percentComplete = "unknown";
while (!stopped.await(waitTime, TimeUnit.SECONDS)) {
List<CompactionInfo> running =
org.apache.accumulo.server.compaction.FileCompactor.getRunningCompactions();
if (!running.isEmpty()) {
// Compaction has started. There should only be one in the list
CompactionInfo info = running.get(0);
if (info != null) {
final long entriesRead = info.getEntriesRead();
final long entriesWritten = info.getEntriesWritten();
if (inputEntries > 0) {
percentComplete = Float.toString((entriesRead / (float) inputEntries) * 100);
}
String message = String.format(
"Compaction in progress, read %d of %d input entries ( %s %s ), written %d entries",
entriesRead, inputEntries, percentComplete, "%", entriesWritten);
watcher.run();
try {
LOG.debug("Updating coordinator with compaction progress: {}.", message);
TCompactionStatusUpdate update = new TCompactionStatusUpdate(
TCompactionState.IN_PROGRESS, message, inputEntries, entriesRead,
entriesWritten, fcr.getCompactionAge().toNanos());
updateCompactionState(job, update);
} catch (RetriesExceededException e) {
LOG.warn("Error updating coordinator with compaction progress, error: {}",
e.getMessage());
}
}
} else {
LOG.debug("Waiting on compaction thread to finish, but no RUNNING compaction");
}
}
compactionThread.join();
LOG.trace("Compaction thread finished.");
// Run the watcher again to clear out the finished compaction and set the
// stuck count to zero.
watcher.run();
if (err.get() != null) {
// maybe the error occured because the table was deleted or something like that, so
// force a cancel check to possibly reduce noise in the logs
checkIfCanceled();
}
if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled()
|| (err.get() != null && err.get().getClass().equals(InterruptedException.class))) {
LOG.warn("Compaction thread was interrupted, sending CANCELLED state");
try {
TCompactionStatusUpdate update =
new TCompactionStatusUpdate(TCompactionState.CANCELLED, "Compaction cancelled",
-1, -1, -1, fcr.getCompactionAge().toNanos());
updateCompactionState(job, update);
updateCompactionFailed(job);
} catch (RetriesExceededException e) {
LOG.error("Error updating coordinator with compaction cancellation.", e);
} finally {
currentCompactionId.set(null);
}
} else if (err.get() != null) {
KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent());
try {
LOG.info("Updating coordinator with compaction failure: id: {}, extent: {}",
job.getExternalCompactionId(), fromThriftExtent);
TCompactionStatusUpdate update = new TCompactionStatusUpdate(
TCompactionState.FAILED, "Compaction failed due to: " + err.get().getMessage(),
-1, -1, -1, fcr.getCompactionAge().toNanos());
updateCompactionState(job, update);
updateCompactionFailed(job);
} catch (RetriesExceededException e) {
LOG.error("Error updating coordinator with compaction failure: id: {}, extent: {}",
job.getExternalCompactionId(), fromThriftExtent, e);
} finally {
currentCompactionId.set(null);
}
} else {
try {
LOG.trace("Updating coordinator with compaction completion.");
updateCompactionCompleted(job, JOB_HOLDER.getStats());
} catch (RetriesExceededException e) {
LOG.error(
"Error updating coordinator with compaction completion, cancelling compaction.",
e);
try {
cancel(job.getExternalCompactionId());
} catch (TException e1) {
LOG.error("Error cancelling compaction.", e1);
}
} finally {
currentCompactionId.set(null);
}
}
} catch (RuntimeException e1) {
LOG.error(
"Compactor thread was interrupted waiting for compaction to start, cancelling job",
e1);
try {
cancel(job.getExternalCompactionId());
} catch (TException e2) {
LOG.error("Error cancelling compaction.", e2);
}
} finally {
currentCompactionId.set(null);
// mark compactor as idle after compaction completes
updateIdleStatus(true);
// In the case where there is an error in the foreground code the background compaction
// may still be running. Must cancel it before starting another iteration of the loop to
// avoid multiple threads updating shared state.
while (compactionThread.isAlive()) {
compactionThread.interrupt();
compactionThread.join(1000);
}
}
} catch (InterruptedException e) {
LOG.info("Interrupt Exception received, shutting down");
gracefulShutdown(getContext().rpcCreds());
}
} // end while
} catch (Exception e) {
LOG.error("Unhandled error occurred in Compactor", e);
} finally {
// Shutdown local thrift server
LOG.debug("Stopping Thrift Servers");
if (compactorAddress.server != null) {
compactorAddress.server.stop();
}
try {
LOG.debug("Closing filesystems");
VolumeManager mgr = getContext().getVolumeManager();
if (null != mgr) {
mgr.close();
}
} catch (IOException e) {
LOG.warn("Failed to close filesystem : {}", e.getMessage(), e);
}
getContext().getLowMemoryDetector().logGCInfo(getConfiguration());
getShutdownComplete().set(true);
LOG.info("stop requested. exiting ... ");
try {
if (null != compactorLock) {
compactorLock.unlock();
}
} catch (Exception e) {
LOG.warn("Failed to release compactor lock", e);
}
}
}