void internalTrimLedgers()

in managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java [2719:2930]


    void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
        if (!factory.isMetadataServiceAvailable()) {
            // Defer trimming of ledger if we cannot connect to metadata service
            promise.completeExceptionally(new MetaStoreException("Metadata service is not available"));
            return;
        }

        // Ensure only one trimming operation is active
        if (!trimmerMutex.tryLock()) {
            scheduleDeferredTrimming(isTruncate, promise);
            return;
        }

        List<LedgerInfo> ledgersToDelete = new ArrayList<>();
        List<LedgerInfo> offloadedLedgersToDelete = new ArrayList<>();
        Optional<OffloadPolicies> optionalOffloadPolicies = getOffloadPoliciesIfAppendable();
        synchronized (this) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Start TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.keySet(),
                        TOTAL_SIZE_UPDATER.get(this));
            }
            State currentState = STATE_UPDATER.get(this);
            if (currentState == State.Closed) {
                log.debug("[{}] Ignoring trimming request since the managed ledger was already closed", name);
                trimmerMutex.unlock();
                promise.completeExceptionally(new ManagedLedgerAlreadyClosedException("Can't trim closed ledger"));
                return;
            }
            // Allow for FencedForDeletion
            if (currentState == State.Fenced) {
                log.debug("[{}] Ignoring trimming request since the managed ledger was already fenced", name);
                trimmerMutex.unlock();
                promise.completeExceptionally(new ManagedLedgerFencedException("Can't trim fenced ledger"));
                return;
            }

            long slowestReaderLedgerId = -1;
            final LazyLoadableValue<Long> slowestNonDurationLedgerId =
                    new LazyLoadableValue(() -> getTheSlowestNonDurationReadPosition().getLedgerId());
            final long retentionSizeInMB = config.getRetentionSizeInMB();
            final long retentionTimeMs = config.getRetentionTimeMillis();
            final long totalSizeOfML = TOTAL_SIZE_UPDATER.get(this);
            if (!cursors.hasDurableCursors()) {
                // At this point the lastLedger will be pointing to the
                // ledger that has just been closed, therefore the +1 to
                // include lastLedger in the trimming.
                slowestReaderLedgerId = currentLedger.getId() + 1;
            } else {
                Position slowestReaderPosition = cursors.getSlowestReaderPosition();
                if (slowestReaderPosition != null) {
                    // The slowest reader position is the mark delete position.
                    // If the slowest reader position point the last entry in the ledger x,
                    // the slowestReaderLedgerId should be x + 1 and the ledger x could be deleted.
                    LedgerInfo ledgerInfo = ledgers.get(slowestReaderPosition.getLedgerId());
                    if (ledgerInfo != null && ledgerInfo.getLedgerId() != currentLedger.getId()
                            && ledgerInfo.getEntries() == slowestReaderPosition.getEntryId() + 1) {
                        slowestReaderLedgerId = slowestReaderPosition.getLedgerId() + 1;
                    } else {
                        slowestReaderLedgerId = slowestReaderPosition.getLedgerId();
                    }
                } else {
                    promise.completeExceptionally(new ManagedLedgerException("Couldn't find reader position"));
                    trimmerMutex.unlock();
                    return;
                }
            }

            if (log.isDebugEnabled()) {
                log.debug("[{}] Slowest consumer ledger id: {}", name, slowestReaderLedgerId);
            }

            long totalSizeToDelete = 0;
            // skip ledger if retention constraint met
            Iterator<LedgerInfo> ledgerInfoIterator =
                    ledgers.headMap(slowestReaderLedgerId, false).values().iterator();
            while (ledgerInfoIterator.hasNext()){
                LedgerInfo ls = ledgerInfoIterator.next();
                // currentLedger can not be deleted
                if (ls.getLedgerId() == currentLedger.getId()) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Ledger {} skipped for deletion as it is currently being written to", name,
                                ls.getLedgerId());
                    }
                    break;
                }
                // if truncate, all ledgers besides currentLedger are going to be deleted
                if (isTruncate) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Ledger {} will be truncated with ts {}",
                                name, ls.getLedgerId(), ls.getTimestamp());
                    }
                    ledgersToDelete.add(ls);
                    continue;
                }

                totalSizeToDelete += ls.getSize();
                boolean overRetentionQuota = isLedgerRetentionOverSizeQuota(retentionSizeInMB, totalSizeOfML,
                        totalSizeToDelete);
                boolean expired = hasLedgerRetentionExpired(retentionTimeMs, ls.getTimestamp());
                if (log.isDebugEnabled()) {
                    log.debug(
                            "[{}] Checking ledger {} -- time-old: {} sec -- "
                                    + "expired: {} -- over-quota: {} -- current-ledger: {}",
                            name, ls.getLedgerId(), (clock.millis() - ls.getTimestamp()) / 1000.0, expired,
                            overRetentionQuota, currentLedger.getId());
                }

                if (expired || overRetentionQuota) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Ledger {} has expired or over quota, expired is: {}, ts: {}, "
                                        + "overRetentionQuota is: {}, ledge size: {}",
                                name, ls.getLedgerId(), expired, ls.getTimestamp(), overRetentionQuota, ls.getSize());
                    }
                    ledgersToDelete.add(ls);
                } else {
                    // once retention constraint has been met, skip check
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota", name, ls.getLedgerId());
                    }
                    releaseReadHandleIfNoLongerRead(ls.getLedgerId(), slowestNonDurationLedgerId.getValue());
                    break;
                }
            }

            while (ledgerInfoIterator.hasNext()) {
                LedgerInfo ls = ledgerInfoIterator.next();
                if (!releaseReadHandleIfNoLongerRead(ls.getLedgerId(), slowestNonDurationLedgerId.getValue())) {
                    break;
                }
            }

            for (LedgerInfo ls : ledgers.values()) {
                if (isOffloadedNeedsDelete(ls.getOffloadContext(), optionalOffloadPolicies)
                        && !ledgersToDelete.contains(ls)) {
                    log.debug("[{}] Ledger {} has been offloaded, bookkeeper ledger needs to be deleted", name,
                            ls.getLedgerId());
                    offloadedLedgersToDelete.add(ls);
                }
            }

            if (ledgersToDelete.isEmpty() && offloadedLedgersToDelete.isEmpty()) {
                trimmerMutex.unlock();
                promise.complete(null);
                return;
            }

            if (currentState == State.CreatingLedger // Give up now and schedule a new trimming
                    || !metadataMutex.tryLock()) { // Avoid deadlocks with other operations updating the ledgers list
                scheduleDeferredTrimming(isTruncate, promise);
                trimmerMutex.unlock();
                return;
            }

            try {
                advanceCursorsIfNecessary(ledgersToDelete);
            } catch (LedgerNotExistException e) {
                log.info("First non deleted Ledger is not found, stop trimming");
                metadataMutex.unlock();
                trimmerMutex.unlock();
                return;
            }

            doDeleteLedgers(ledgersToDelete);

            for (LedgerInfo ls : offloadedLedgersToDelete) {
                LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
                newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
                String driverName = OffloadUtils.getOffloadDriverName(ls,
                        config.getLedgerOffloader().getOffloadDriverName());
                Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata(ls,
                        config.getLedgerOffloader().getOffloadDriverMetadata());
                OffloadUtils.setOffloadDriverMetadata(newInfoBuilder, driverName, driverMetadata);
                ledgers.put(ls.getLedgerId(), newInfoBuilder.build());
            }

            if (log.isDebugEnabled()) {
                log.debug("[{}] Updating of ledgers list after trimming", name);
            }

            store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() {
                @Override
                public void operationComplete(Void result, Stat stat) {
                    log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(),
                            TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
                    ledgersStat = stat;
                    metadataMutex.unlock();
                    trimmerMutex.unlock();

                    for (LedgerInfo ls : ledgersToDelete) {
                        log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize());
                        asyncDeleteLedger(ls.getLedgerId(), ls);
                    }
                    for (LedgerInfo ls : offloadedLedgersToDelete) {
                        log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, ls.getLedgerId(),
                                ls.getSize());
                        asyncDeleteLedgerFromBookKeeper(ls.getLedgerId());
                    }
                    promise.complete(null);
                }

                @Override
                public void operationFailed(MetaStoreException e) {
                    log.warn("[{}] Failed to update the list of ledgers after trimming", name, e);
                    metadataMutex.unlock();
                    trimmerMutex.unlock();
                    handleBadVersion(e);

                    promise.completeExceptionally(e);
                }
            });
        }
    }