in server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java [156:361]
public void run() {
try {
waitForUpgrade();
} catch (InterruptedException e) {
log.error("Interrupted while waiting for upgrade to complete, exiting...");
System.exit(1);
}
final VolumeManager fs = getContext().getVolumeManager();
// Sleep for an initial period, giving the manager time to start up and
// old data files to be unused
log.info("Trying to acquire ZooKeeper lock for garbage collector");
HostAndPort address = startStatsService();
try {
getZooLock(address);
} catch (Exception ex) {
log.error("{}", ex.getMessage(), ex);
System.exit(1);
}
this.getContext().setServiceLock(gcLock);
MetricsInfo metricsInfo = getContext().getMetricsInfo();
metricsInfo.addMetricsProducers(this, new GcMetrics(this));
metricsInfo.init(MetricsInfo.serviceTags(getContext().getInstanceName(), getApplicationName(),
address, getResourceGroup()));
try {
long delay = getStartDelay();
log.debug("Sleeping for {} milliseconds before beginning garbage collection cycles", delay);
Thread.sleep(delay);
} catch (InterruptedException e) {
log.warn("{}", e.getMessage(), e);
return;
}
// This is created outside of the run loop and passed to the walogCollector so that
// only a single timed task is created (internal to LiveTServerSet) using SimpleTimer.
final LiveTServerSet liveTServerSet =
new LiveTServerSet(getContext(), (current, deleted, added) -> {
log.debug("Number of current servers {}, tservers added {}, removed {}",
current == null ? -1 : current.size(), added, deleted);
if (log.isTraceEnabled()) {
log.trace("Current servers: {}\nAdded: {}\n Removed: {}", current, added, deleted);
}
});
while (!isShutdownRequested()) {
if (Thread.currentThread().isInterrupted()) {
log.info("Server process thread has been interrupted, shutting down");
break;
}
try {
Span outerSpan = TraceUtil.startSpan(this.getClass(), "gc");
try (Scope outerScope = outerSpan.makeCurrent()) {
Span innerSpan = TraceUtil.startSpan(this.getClass(), "loop");
try (Scope innerScope = innerSpan.makeCurrent()) {
final long tStart = System.nanoTime();
try {
System.gc(); // make room
status.current.started = System.currentTimeMillis();
var rootGC = new GCRun(DataLevel.ROOT, getContext());
var mdGC = new GCRun(DataLevel.METADATA, getContext());
var userGC = new GCRun(DataLevel.USER, getContext());
log.info("Starting Root table Garbage Collection.");
status.current.bulks += new GarbageCollectionAlgorithm().collect(rootGC);
incrementStatsForRun(rootGC);
logStats();
log.info("Starting Metadata table Garbage Collection.");
status.current.bulks += new GarbageCollectionAlgorithm().collect(mdGC);
incrementStatsForRun(mdGC);
logStats();
log.info("Starting User table Garbage Collection.");
status.current.bulks += new GarbageCollectionAlgorithm().collect(userGC);
incrementStatsForRun(userGC);
logStats();
} catch (Exception e) {
TraceUtil.setException(innerSpan, e, false);
log.error("{}", e.getMessage(), e);
} finally {
status.current.finished = System.currentTimeMillis();
status.last = status.current;
gcCycleMetrics.setLastCollect(status.current);
status.current = new GcCycleStats();
}
final long tStop = System.nanoTime();
log.info(String.format("Collect cycle took %.2f seconds",
(TimeUnit.NANOSECONDS.toMillis(tStop - tStart) / 1000.0)));
// Clean up any unused write-ahead logs
Span walSpan = TraceUtil.startSpan(this.getClass(), "walogs");
try (Scope walScope = walSpan.makeCurrent()) {
GarbageCollectWriteAheadLogs walogCollector =
new GarbageCollectWriteAheadLogs(getContext(), fs, liveTServerSet);
log.info("Beginning garbage collection of write-ahead logs");
walogCollector.collect(status);
gcCycleMetrics.setLastWalCollect(status.lastLog);
} catch (Exception e) {
TraceUtil.setException(walSpan, e, false);
log.error("{}", e.getMessage(), e);
} finally {
walSpan.end();
}
} catch (Exception e) {
TraceUtil.setException(innerSpan, e, true);
throw e;
} finally {
innerSpan.end();
}
// we just made a lot of metadata changes: flush them out
try {
AccumuloClient accumuloClient = getContext();
final long actionStart = System.nanoTime();
String action = getConfiguration().get(Property.GC_USE_FULL_COMPACTION);
log.debug("gc post action {} started", action);
switch (action) {
case "compact":
accumuloClient.tableOperations().compact(SystemTables.METADATA.tableName(), null,
null, true, true);
accumuloClient.tableOperations().compact(SystemTables.ROOT.tableName(), null, null,
true, true);
break;
case "flush":
accumuloClient.tableOperations().flush(SystemTables.METADATA.tableName(), null,
null, true);
accumuloClient.tableOperations().flush(SystemTables.ROOT.tableName(), null, null,
true);
break;
default:
log.trace("'none - no action' or invalid value provided: {}", action);
}
final long actionComplete = System.nanoTime();
gcCycleMetrics.setPostOpDurationNanos(actionComplete - actionStart);
log.info("gc post action {} completed in {} seconds", action, String.format("%.2f",
(TimeUnit.NANOSECONDS.toMillis(actionComplete - actionStart) / 1000.0)));
} catch (Exception e) {
TraceUtil.setException(outerSpan, e, false);
log.warn("{}", e.getMessage(), e);
}
} catch (Exception e) {
TraceUtil.setException(outerSpan, e, true);
throw e;
} finally {
outerSpan.end();
}
try {
gcCycleMetrics.incrementRunCycleCount();
long gcDelay = getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY);
if (lastCompactorCheck.hasElapsed(gcDelay * 3, MILLISECONDS)) {
Map<String,Set<TableId>> resourceMapping = new HashMap<>();
for (TableId tid : SystemTables.tableIds()) {
TableConfiguration tconf = getContext().getTableConfiguration(tid);
String resourceGroup = tconf.get(TableLoadBalancer.TABLE_ASSIGNMENT_GROUP_PROPERTY);
resourceGroup =
resourceGroup == null ? Constants.DEFAULT_RESOURCE_GROUP_NAME : resourceGroup;
resourceMapping.computeIfAbsent(resourceGroup, k -> new HashSet<>()).add(tid);
}
for (Entry<String,Set<TableId>> e : resourceMapping.entrySet()) {
if (ExternalCompactionUtil.countCompactors(e.getKey(), getContext()) == 0) {
log.warn("No Compactors exist in resource group {} for system table {}", e.getKey(),
e.getValue());
}
}
lastCompactorCheck.restart();
}
log.debug("Sleeping for {} milliseconds", gcDelay);
Thread.sleep(gcDelay);
} catch (InterruptedException e) {
log.warn("{}", e.getMessage(), e);
throw e;
}
} catch (InterruptedException e) {
log.info("Interrupt Exception received, shutting down");
gracefulShutdown(getContext().rpcCreds());
}
}
getShutdownComplete().set(true);
log.info("stop requested. exiting ... ");
try {
gcLock.unlock();
} catch (Exception e) {
log.warn("Failed to release GarbageCollector lock", e);
}
}