void doCompactEntryLogs()

in bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java [628:736]


    void doCompactEntryLogs(double threshold, long maxTimeMillis) throws EntryLogMetadataMapException {
        LOG.info("Do compaction to compact those files lower than {}", threshold);

        final int numBuckets = ENTRY_LOG_USAGE_SEGMENT_COUNT;
        int[] entryLogUsageBuckets = new int[numBuckets];
        int[] compactedBuckets = new int[numBuckets];

        ArrayList<LinkedList<Long>> compactableBuckets = new ArrayList<>(numBuckets);
        for (int i = 0; i < numBuckets; i++) {
            compactableBuckets.add(new LinkedList<>());
        }

        long start = System.currentTimeMillis();
        MutableLong end = new MutableLong(start);
        MutableLong timeDiff = new MutableLong(0);

        entryLogMetaMap.forEach((entryLogId, meta) -> {
            double usage = meta.getUsage();
            if (conf.isUseTargetEntryLogSizeForGc() && usage < 1.0d) {
                usage = (double) meta.getRemainingSize() / Math.max(meta.getTotalSize(), conf.getEntryLogSizeLimit());
            }
            int bucketIndex = calculateUsageIndex(numBuckets, usage);
            entryLogUsageBuckets[bucketIndex]++;

            if (timeDiff.getValue() < maxTimeMillis) {
                end.setValue(System.currentTimeMillis());
                timeDiff.setValue(end.getValue() - start);
            }
            if ((usage >= threshold
                || (maxTimeMillis > 0 && timeDiff.getValue() >= maxTimeMillis)
                || !running)) {
                // We allow the usage limit calculation to continue so that we get an accurate
                // report of where the usage was prior to running compaction.
                return;
            }

            compactableBuckets.get(bucketIndex).add(meta.getEntryLogId());
        });
        currentEntryLogUsageBuckets = entryLogUsageBuckets;
        gcStats.setEntryLogUsageBuckets(currentEntryLogUsageBuckets);
        LOG.info(
                "Compaction: entry log usage buckets before compaction [10% 20% 30% 40% 50% 60% 70% 80% 90% 100%] = {}",
                entryLogUsageBuckets);

        final int maxBucket = calculateUsageIndex(numBuckets, threshold);
        int totalEntryLogIds = 0;
        for (int currBucket = 0; currBucket <= maxBucket; currBucket++) {
            totalEntryLogIds += compactableBuckets.get(currBucket).size();
        }
        long lastPrintTimestamp = 0;
        AtomicInteger processedEntryLogCnt = new AtomicInteger(0);

        stopCompaction:
        for (int currBucket = 0; currBucket <= maxBucket; currBucket++) {
            LinkedList<Long> entryLogIds = compactableBuckets.get(currBucket);
            while (!entryLogIds.isEmpty()) {
                if (timeDiff.getValue() < maxTimeMillis) {
                    end.setValue(System.currentTimeMillis());
                    timeDiff.setValue(end.getValue() - start);
                }

                if ((maxTimeMillis > 0 && timeDiff.getValue() >= maxTimeMillis) || !running) {
                    // We allow the usage limit calculation to continue so that we get an accurate
                    // report of where the usage was prior to running compaction.
                    break stopCompaction;
                }

                final int bucketIndex = currBucket;
                final long logId = entryLogIds.remove();
                if (System.currentTimeMillis() - lastPrintTimestamp >= MINUTE) {
                    lastPrintTimestamp = System.currentTimeMillis();
                    LOG.info("Compaction progress {} / {}, current compaction entryLogId: {}",
                        processedEntryLogCnt.get(), totalEntryLogIds, logId);
                }
                entryLogMetaMap.forKey(logId, (entryLogId, meta) -> {
                    if (meta == null) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Metadata for entry log {} already deleted", logId);
                        }
                        return;
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Compacting entry log {} with usage {} below threshold {}",
                                meta.getEntryLogId(), meta.getUsage(), threshold);
                    }

                    long priorRemainingSize = meta.getRemainingSize();
                    compactEntryLog(meta);
                    gcStats.getReclaimedSpaceViaCompaction().addCount(meta.getTotalSize() - priorRemainingSize);
                    compactedBuckets[bucketIndex]++;
                    processedEntryLogCnt.getAndIncrement();
                });
            }
        }

        if (LOG.isDebugEnabled()) {
            if (!running) {
                LOG.debug("Compaction exited due to gc not running");
            }
            if (maxTimeMillis > 0 && timeDiff.getValue() > maxTimeMillis) {
                LOG.debug("Compaction ran for {}ms but was limited by {}ms", timeDiff, maxTimeMillis);
            }
        }
        int totalEntryLogNum = Arrays.stream(entryLogUsageBuckets).sum();
        int compactedEntryLogNum = Arrays.stream(compactedBuckets).sum();
        this.entryLogCompactRatio = totalEntryLogNum == 0 ? 0 : (double) compactedEntryLogNum / totalEntryLogNum;
        LOG.info("Compaction: entry log usage buckets[10% 20% 30% 40% 50% 60% 70% 80% 90% 100%] = {}, compacted {}, "
                + "compacted entry log ratio {}", entryLogUsageBuckets, compactedBuckets, entryLogCompactRatio);
    }