public GarbageCollectorThread()

in bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java [166:305]


    public GarbageCollectorThread(ServerConfiguration conf,
                                  LedgerManager ledgerManager,
                                  final LedgerDirsManager ledgerDirsManager,
                                  final CompactableLedgerStorage ledgerStorage,
                                  EntryLogger entryLogger,
                                  StatsLogger statsLogger,
                                  ScheduledExecutorService gcExecutor)
        throws IOException {
        this.gcExecutor = gcExecutor;
        this.conf = conf;

        this.ledgerDirsManager = ledgerDirsManager;
        this.entryLogger = entryLogger;
        this.entryLogMetaMap = createEntryLogMetadataMap();
        this.ledgerStorage = ledgerStorage;
        this.gcWaitTime = conf.getGcWaitTime();

        this.numActiveEntryLogs = 0;
        this.activeEntryLogSize = 0L;
        this.totalEntryLogSize = 0L;
        this.entryLogCompactRatio = 0.0;
        this.currentEntryLogUsageBuckets = new int[ENTRY_LOG_USAGE_SEGMENT_COUNT];
        this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, ledgerStorage, conf, statsLogger);
        this.gcStats = new GarbageCollectorStats(
            statsLogger,
            () -> numActiveEntryLogs,
            () -> activeEntryLogSize,
            () -> totalEntryLogSize,
            () -> garbageCollector.getNumActiveLedgers(),
            () -> entryLogCompactRatio,
            () -> currentEntryLogUsageBuckets
        );

        this.garbageCleaner = ledgerId -> {
            try {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("delete ledger : " + ledgerId);
                }
                gcStats.getDeletedLedgerCounter().inc();
                ledgerStorage.deleteLedger(ledgerId);
            } catch (IOException e) {
                LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);
            }
        };

        // compaction parameters
        minorCompactionThreshold = conf.getMinorCompactionThreshold();
        minorCompactionInterval = conf.getMinorCompactionInterval() * SECOND;
        majorCompactionThreshold = conf.getMajorCompactionThreshold();
        majorCompactionInterval = conf.getMajorCompactionInterval() * SECOND;
        isForceGCAllowWhenNoSpace = conf.getIsForceGCAllowWhenNoSpace();
        majorCompactionMaxTimeMillis = conf.getMajorCompactionMaxTimeMillis();
        minorCompactionMaxTimeMillis = conf.getMinorCompactionMaxTimeMillis();
        entryLocationCompactionInterval = conf.getEntryLocationCompactionInterval() * SECOND;
        if (entryLocationCompactionInterval > 0) {
            randomCompactionDelay = ThreadLocalRandom.current().nextLong(entryLocationCompactionInterval);
        }

        boolean isForceAllowCompaction = conf.isForceAllowCompaction();

        AbstractLogCompactor.LogRemovalListener remover = new AbstractLogCompactor.LogRemovalListener() {
            @Override
            public void removeEntryLog(long logToRemove) {
                try {
                    GarbageCollectorThread.this.removeEntryLog(logToRemove);
                } catch (EntryLogMetadataMapException e) {
                    // Ignore and continue because ledger will not be cleaned up
                    // from entry-logger in this pass and will be taken care in
                    // next schedule task
                    LOG.warn("Failed to remove entry-log metadata {}", logToRemove, e);
                }
            }
        };
        if (conf.getUseTransactionalCompaction()) {
            this.compactor = new TransactionalEntryLogCompactor(conf, entryLogger, ledgerStorage, remover);
        } else {
            this.compactor = new EntryLogCompactor(conf, entryLogger, ledgerStorage, remover);
        }

        this.throttler = new AbstractLogCompactor.Throttler(conf);
        if (minorCompactionInterval > 0 && minorCompactionThreshold > 0) {
            if (minorCompactionThreshold > 1.0d) {
                throw new IOException("Invalid minor compaction threshold "
                                    + minorCompactionThreshold);
            }
            if (minorCompactionInterval < gcWaitTime) {
                throw new IOException("Too short minor compaction interval : "
                                    + minorCompactionInterval);
            }
            enableMinorCompaction = true;
        }

        if (isForceAllowCompaction) {
            if (minorCompactionThreshold > 0 && minorCompactionThreshold < 1.0d) {
                isForceMinorCompactionAllow = true;
            }
            if (majorCompactionThreshold > 0 && majorCompactionThreshold < 1.0d) {
                isForceMajorCompactionAllow = true;
            }
        }

        if (majorCompactionInterval > 0 && majorCompactionThreshold > 0) {
            if (majorCompactionThreshold > 1.0d) {
                throw new IOException("Invalid major compaction threshold "
                                    + majorCompactionThreshold);
            }
            if (majorCompactionInterval < gcWaitTime) {
                throw new IOException("Too short major compaction interval : "
                                    + majorCompactionInterval);
            }
            enableMajorCompaction = true;
        }

        if (enableMinorCompaction && enableMajorCompaction) {
            if (minorCompactionInterval >= majorCompactionInterval
                || minorCompactionThreshold >= majorCompactionThreshold) {
                throw new IOException("Invalid minor/major compaction settings : minor ("
                                    + minorCompactionThreshold + ", " + minorCompactionInterval
                                    + "), major (" + majorCompactionThreshold + ", "
                                    + majorCompactionInterval + ")");
            }
        }

        if (entryLocationCompactionInterval > 0) {
            if (entryLocationCompactionInterval < gcWaitTime) {
                throw new IOException(
                        "Too short entry location compaction interval : " + entryLocationCompactionInterval);
            }
        }

        LOG.info("Minor Compaction : enabled=" + enableMinorCompaction + ", threshold="
               + minorCompactionThreshold + ", interval=" + minorCompactionInterval);
        LOG.info("Major Compaction : enabled=" + enableMajorCompaction + ", threshold="
               + majorCompactionThreshold + ", interval=" + majorCompactionInterval);
        LOG.info("Entry Location Compaction : interval=" + entryLocationCompactionInterval + ", randomCompactionDelay="
                + randomCompactionDelay);

        lastMinorCompactionTime = lastMajorCompactionTime =
            lastEntryLocationCompactionTime = System.currentTimeMillis();
    }