in bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java [628:736]
void doCompactEntryLogs(double threshold, long maxTimeMillis) throws EntryLogMetadataMapException {
LOG.info("Do compaction to compact those files lower than {}", threshold);
final int numBuckets = ENTRY_LOG_USAGE_SEGMENT_COUNT;
int[] entryLogUsageBuckets = new int[numBuckets];
int[] compactedBuckets = new int[numBuckets];
ArrayList<LinkedList<Long>> compactableBuckets = new ArrayList<>(numBuckets);
for (int i = 0; i < numBuckets; i++) {
compactableBuckets.add(new LinkedList<>());
}
long start = System.currentTimeMillis();
MutableLong end = new MutableLong(start);
MutableLong timeDiff = new MutableLong(0);
entryLogMetaMap.forEach((entryLogId, meta) -> {
double usage = meta.getUsage();
if (conf.isUseTargetEntryLogSizeForGc() && usage < 1.0d) {
usage = (double) meta.getRemainingSize() / Math.max(meta.getTotalSize(), conf.getEntryLogSizeLimit());
}
int bucketIndex = calculateUsageIndex(numBuckets, usage);
entryLogUsageBuckets[bucketIndex]++;
if (timeDiff.getValue() < maxTimeMillis) {
end.setValue(System.currentTimeMillis());
timeDiff.setValue(end.getValue() - start);
}
if ((usage >= threshold
|| (maxTimeMillis > 0 && timeDiff.getValue() >= maxTimeMillis)
|| !running)) {
// We allow the usage limit calculation to continue so that we get an accurate
// report of where the usage was prior to running compaction.
return;
}
compactableBuckets.get(bucketIndex).add(meta.getEntryLogId());
});
currentEntryLogUsageBuckets = entryLogUsageBuckets;
gcStats.setEntryLogUsageBuckets(currentEntryLogUsageBuckets);
LOG.info(
"Compaction: entry log usage buckets before compaction [10% 20% 30% 40% 50% 60% 70% 80% 90% 100%] = {}",
entryLogUsageBuckets);
final int maxBucket = calculateUsageIndex(numBuckets, threshold);
int totalEntryLogIds = 0;
for (int currBucket = 0; currBucket <= maxBucket; currBucket++) {
totalEntryLogIds += compactableBuckets.get(currBucket).size();
}
long lastPrintTimestamp = 0;
AtomicInteger processedEntryLogCnt = new AtomicInteger(0);
stopCompaction:
for (int currBucket = 0; currBucket <= maxBucket; currBucket++) {
LinkedList<Long> entryLogIds = compactableBuckets.get(currBucket);
while (!entryLogIds.isEmpty()) {
if (timeDiff.getValue() < maxTimeMillis) {
end.setValue(System.currentTimeMillis());
timeDiff.setValue(end.getValue() - start);
}
if ((maxTimeMillis > 0 && timeDiff.getValue() >= maxTimeMillis) || !running) {
// We allow the usage limit calculation to continue so that we get an accurate
// report of where the usage was prior to running compaction.
break stopCompaction;
}
final int bucketIndex = currBucket;
final long logId = entryLogIds.remove();
if (System.currentTimeMillis() - lastPrintTimestamp >= MINUTE) {
lastPrintTimestamp = System.currentTimeMillis();
LOG.info("Compaction progress {} / {}, current compaction entryLogId: {}",
processedEntryLogCnt.get(), totalEntryLogIds, logId);
}
entryLogMetaMap.forKey(logId, (entryLogId, meta) -> {
if (meta == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Metadata for entry log {} already deleted", logId);
}
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Compacting entry log {} with usage {} below threshold {}",
meta.getEntryLogId(), meta.getUsage(), threshold);
}
long priorRemainingSize = meta.getRemainingSize();
compactEntryLog(meta);
gcStats.getReclaimedSpaceViaCompaction().addCount(meta.getTotalSize() - priorRemainingSize);
compactedBuckets[bucketIndex]++;
processedEntryLogCnt.getAndIncrement();
});
}
}
if (LOG.isDebugEnabled()) {
if (!running) {
LOG.debug("Compaction exited due to gc not running");
}
if (maxTimeMillis > 0 && timeDiff.getValue() > maxTimeMillis) {
LOG.debug("Compaction ran for {}ms but was limited by {}ms", timeDiff, maxTimeMillis);
}
}
int totalEntryLogNum = Arrays.stream(entryLogUsageBuckets).sum();
int compactedEntryLogNum = Arrays.stream(compactedBuckets).sum();
this.entryLogCompactRatio = totalEntryLogNum == 0 ? 0 : (double) compactedEntryLogNum / totalEntryLogNum;
LOG.info("Compaction: entry log usage buckets[10% 20% 30% 40% 50% 60% 70% 80% 90% 100%] = {}, compacted {}, "
+ "compacted entry log ratio {}", entryLogUsageBuckets, compactedBuckets, entryLogCompactRatio);
}