in bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java [166:305]
public GarbageCollectorThread(ServerConfiguration conf,
LedgerManager ledgerManager,
final LedgerDirsManager ledgerDirsManager,
final CompactableLedgerStorage ledgerStorage,
EntryLogger entryLogger,
StatsLogger statsLogger,
ScheduledExecutorService gcExecutor)
throws IOException {
this.gcExecutor = gcExecutor;
this.conf = conf;
this.ledgerDirsManager = ledgerDirsManager;
this.entryLogger = entryLogger;
this.entryLogMetaMap = createEntryLogMetadataMap();
this.ledgerStorage = ledgerStorage;
this.gcWaitTime = conf.getGcWaitTime();
this.numActiveEntryLogs = 0;
this.activeEntryLogSize = 0L;
this.totalEntryLogSize = 0L;
this.entryLogCompactRatio = 0.0;
this.currentEntryLogUsageBuckets = new int[ENTRY_LOG_USAGE_SEGMENT_COUNT];
this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, ledgerStorage, conf, statsLogger);
this.gcStats = new GarbageCollectorStats(
statsLogger,
() -> numActiveEntryLogs,
() -> activeEntryLogSize,
() -> totalEntryLogSize,
() -> garbageCollector.getNumActiveLedgers(),
() -> entryLogCompactRatio,
() -> currentEntryLogUsageBuckets
);
this.garbageCleaner = ledgerId -> {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("delete ledger : " + ledgerId);
}
gcStats.getDeletedLedgerCounter().inc();
ledgerStorage.deleteLedger(ledgerId);
} catch (IOException e) {
LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);
}
};
// compaction parameters
minorCompactionThreshold = conf.getMinorCompactionThreshold();
minorCompactionInterval = conf.getMinorCompactionInterval() * SECOND;
majorCompactionThreshold = conf.getMajorCompactionThreshold();
majorCompactionInterval = conf.getMajorCompactionInterval() * SECOND;
isForceGCAllowWhenNoSpace = conf.getIsForceGCAllowWhenNoSpace();
majorCompactionMaxTimeMillis = conf.getMajorCompactionMaxTimeMillis();
minorCompactionMaxTimeMillis = conf.getMinorCompactionMaxTimeMillis();
entryLocationCompactionInterval = conf.getEntryLocationCompactionInterval() * SECOND;
if (entryLocationCompactionInterval > 0) {
randomCompactionDelay = ThreadLocalRandom.current().nextLong(entryLocationCompactionInterval);
}
boolean isForceAllowCompaction = conf.isForceAllowCompaction();
AbstractLogCompactor.LogRemovalListener remover = new AbstractLogCompactor.LogRemovalListener() {
@Override
public void removeEntryLog(long logToRemove) {
try {
GarbageCollectorThread.this.removeEntryLog(logToRemove);
} catch (EntryLogMetadataMapException e) {
// Ignore and continue because ledger will not be cleaned up
// from entry-logger in this pass and will be taken care in
// next schedule task
LOG.warn("Failed to remove entry-log metadata {}", logToRemove, e);
}
}
};
if (conf.getUseTransactionalCompaction()) {
this.compactor = new TransactionalEntryLogCompactor(conf, entryLogger, ledgerStorage, remover);
} else {
this.compactor = new EntryLogCompactor(conf, entryLogger, ledgerStorage, remover);
}
this.throttler = new AbstractLogCompactor.Throttler(conf);
if (minorCompactionInterval > 0 && minorCompactionThreshold > 0) {
if (minorCompactionThreshold > 1.0d) {
throw new IOException("Invalid minor compaction threshold "
+ minorCompactionThreshold);
}
if (minorCompactionInterval < gcWaitTime) {
throw new IOException("Too short minor compaction interval : "
+ minorCompactionInterval);
}
enableMinorCompaction = true;
}
if (isForceAllowCompaction) {
if (minorCompactionThreshold > 0 && minorCompactionThreshold < 1.0d) {
isForceMinorCompactionAllow = true;
}
if (majorCompactionThreshold > 0 && majorCompactionThreshold < 1.0d) {
isForceMajorCompactionAllow = true;
}
}
if (majorCompactionInterval > 0 && majorCompactionThreshold > 0) {
if (majorCompactionThreshold > 1.0d) {
throw new IOException("Invalid major compaction threshold "
+ majorCompactionThreshold);
}
if (majorCompactionInterval < gcWaitTime) {
throw new IOException("Too short major compaction interval : "
+ majorCompactionInterval);
}
enableMajorCompaction = true;
}
if (enableMinorCompaction && enableMajorCompaction) {
if (minorCompactionInterval >= majorCompactionInterval
|| minorCompactionThreshold >= majorCompactionThreshold) {
throw new IOException("Invalid minor/major compaction settings : minor ("
+ minorCompactionThreshold + ", " + minorCompactionInterval
+ "), major (" + majorCompactionThreshold + ", "
+ majorCompactionInterval + ")");
}
}
if (entryLocationCompactionInterval > 0) {
if (entryLocationCompactionInterval < gcWaitTime) {
throw new IOException(
"Too short entry location compaction interval : " + entryLocationCompactionInterval);
}
}
LOG.info("Minor Compaction : enabled=" + enableMinorCompaction + ", threshold="
+ minorCompactionThreshold + ", interval=" + minorCompactionInterval);
LOG.info("Major Compaction : enabled=" + enableMajorCompaction + ", threshold="
+ majorCompactionThreshold + ", interval=" + majorCompactionInterval);
LOG.info("Entry Location Compaction : interval=" + entryLocationCompactionInterval + ", randomCompactionDelay="
+ randomCompactionDelay);
lastMinorCompactionTime = lastMajorCompactionTime =
lastEntryLocationCompactionTime = System.currentTimeMillis();
}