public void calculateCursorBacklogs()

in managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java [1203:1420]


    public void calculateCursorBacklogs(final TopicName topicName,
                                         final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers,
                                         final PersistentOfflineTopicStats offlineTopicStats, boolean accurate,
                                        BookKeeper.DigestType digestType, byte[] password) throws Exception {
        if (ledgers.isEmpty()) {
            return;
        }
        String managedLedgerName = topicName.getPersistenceNamingEncoding();
        MetaStore store = getMetaStore();
        BookKeeper bk = getBookKeeper().get();
        final CountDownLatch allCursorsCounter = new CountDownLatch(1);
        final long errorInReadingCursor = -1;
        final var ledgerRetryMap = new ConcurrentHashMap<String, Long>();

        final MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgers.lastEntry().getValue();
        final Position lastLedgerPosition =
                PositionFactory.create(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
        if (log.isDebugEnabled()) {
            log.debug("[{}] Last ledger position {}", managedLedgerName, lastLedgerPosition);
        }

        store.getCursors(managedLedgerName, new MetaStore.MetaStoreCallback<List<String>>() {
            @Override
            public void operationComplete(List<String> cursors, Stat v) {
                // Load existing cursors
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Found {} cursors", managedLedgerName, cursors.size());
                }

                if (cursors.isEmpty()) {
                    allCursorsCounter.countDown();
                    return;
                }

                final CountDownLatch cursorCounter = new CountDownLatch(cursors.size());

                for (final String cursorName : cursors) {
                    // determine subscription position from cursor ledger
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Loading cursor {}", managedLedgerName, cursorName);
                    }

                    AsyncCallback.OpenCallback cursorLedgerOpenCb = (rc, lh, ctx1) -> {
                        long ledgerId = lh.getId();
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] Opened cursor ledger {} for cursor {}. rc={}", managedLedgerName, ledgerId,
                                    cursorName, rc);
                        }
                        if (rc != BKException.Code.OK) {
                            log.warn("[{}] Error opening metadata ledger {} for cursor {}: {}", managedLedgerName,
                                    ledgerId, cursorName, BKException.getMessage(rc));
                            cursorCounter.countDown();
                            return;
                        }
                        long lac = lh.getLastAddConfirmed();
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] Cursor {} LAC {} read from ledger {}", managedLedgerName, cursorName, lac,
                                    ledgerId);
                        }

                        if (lac == LedgerHandle.INVALID_ENTRY_ID) {
                            // save the ledger id and cursor to retry outside of this call back
                            // since we are trying to read the same cursor ledger, we will block until
                            // this current callback completes, since an attempt to read the entry
                            // will block behind this current operation to complete
                            ledgerRetryMap.put(cursorName, ledgerId);
                            log.info("[{}] Cursor {} LAC {} read from ledger {}", managedLedgerName, cursorName, lac,
                                    ledgerId);
                            cursorCounter.countDown();
                            return;
                        }
                        final long entryId = lac;
                        // read last acked message position for subscription
                        lh.asyncReadEntries(entryId, entryId, new AsyncCallback.ReadCallback() {
                            @Override
                            public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq,
                                                     Object ctx) {
                                try {
                                    if (log.isDebugEnabled()) {
                                        log.debug("readComplete rc={} entryId={}", rc, entryId);
                                    }
                                    if (rc != BKException.Code.OK) {
                                        log.warn("[{}] Error reading from metadata ledger {} for cursor {}: {}",
                                                managedLedgerName, ledgerId, cursorName, BKException.getMessage(rc));
                                        // indicate that this cursor should be excluded
                                        offlineTopicStats.addCursorDetails(cursorName, errorInReadingCursor,
                                                lh.getId());
                                    } else {
                                        LedgerEntry entry = seq.nextElement();
                                        MLDataFormats.PositionInfo positionInfo;
                                        try {
                                            positionInfo = MLDataFormats.PositionInfo.parseFrom(entry.getEntry());
                                        } catch (InvalidProtocolBufferException e) {
                                            log.warn(
                                                    "[{}] Error reading position from metadata ledger {} for cursor "
                                                            + "{}: {}", managedLedgerName, ledgerId, cursorName, e);
                                            offlineTopicStats.addCursorDetails(cursorName, errorInReadingCursor,
                                                    lh.getId());
                                            return;
                                        }
                                        final Position lastAckedMessagePosition =
                                                PositionFactory.create(positionInfo.getLedgerId(),
                                                        positionInfo.getEntryId());
                                        if (log.isDebugEnabled()) {
                                            log.debug("[{}] Cursor {} MD {} read last ledger position {}",
                                                    managedLedgerName, cursorName, lastAckedMessagePosition,
                                                    lastLedgerPosition);
                                        }
                                        // calculate cursor backlog
                                        Range<Position> range = Range.openClosed(lastAckedMessagePosition,
                                                lastLedgerPosition);
                                        if (log.isDebugEnabled()) {
                                            log.debug("[{}] Calculating backlog for cursor {} using range {}",
                                                    managedLedgerName, cursorName, range);
                                        }
                                        long cursorBacklog = getNumberOfEntries(range, ledgers);
                                        offlineTopicStats.messageBacklog += cursorBacklog;
                                        offlineTopicStats.addCursorDetails(cursorName, cursorBacklog, lh.getId());
                                    }
                                } finally {
                                    cursorCounter.countDown();
                                }
                            }
                        }, null);

                    }; // end of cursor meta read callback

                    store.asyncGetCursorInfo(managedLedgerName, cursorName,
                            new MetaStore.MetaStoreCallback<MLDataFormats.ManagedCursorInfo>() {
                                @Override
                                public void operationComplete(MLDataFormats.ManagedCursorInfo info,
                                                              Stat stat) {
                                    long cursorLedgerId = info.getCursorsLedgerId();
                                    if (log.isDebugEnabled()) {
                                        log.debug("[{}] Cursor {} meta-data read ledger id {}", managedLedgerName,
                                                cursorName, cursorLedgerId);
                                    }
                                    if (cursorLedgerId != -1) {
                                        bk.asyncOpenLedgerNoRecovery(cursorLedgerId, digestType, password,
                                                cursorLedgerOpenCb, null);
                                    } else {
                                        Position lastAckedMessagePosition = PositionFactory.create(
                                                info.getMarkDeleteLedgerId(), info.getMarkDeleteEntryId());
                                        Range<Position> range = Range.openClosed(lastAckedMessagePosition,
                                                lastLedgerPosition);
                                        if (log.isDebugEnabled()) {
                                            log.debug("[{}] Calculating backlog for cursor {} using range {}",
                                                    managedLedgerName, cursorName, range);
                                        }
                                        long cursorBacklog = getNumberOfEntries(range, ledgers);
                                        offlineTopicStats.messageBacklog += cursorBacklog;
                                        offlineTopicStats.addCursorDetails(cursorName, cursorBacklog, cursorLedgerId);
                                        cursorCounter.countDown();
                                    }

                                }

                                @Override
                                public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                                    log.warn("[{}] Unable to obtain cursor ledger for cursor {}: {}", managedLedgerName,
                                            cursorName, e);
                                    cursorCounter.countDown();
                                }
                            });
                } // for every cursor find backlog
                try {
                    if (accurate) {
                        cursorCounter.await();
                    } else {
                        cursorCounter.await(META_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS);
                    }
                } catch (Exception e) {
                    log.warn("[{}] Error reading subscription positions{}", managedLedgerName, e);
                } finally {
                    allCursorsCounter.countDown();
                }
            }

            @Override
            public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                log.warn("[{}] Failed to get the cursors list", managedLedgerName, e);
                allCursorsCounter.countDown();
            }
        });
        if (accurate) {
            allCursorsCounter.await();
        } else {
            allCursorsCounter.await(META_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS);
        }

        // go through ledgers where LAC was -1
        if (accurate && ledgerRetryMap.size() > 0) {
            ledgerRetryMap.forEach((cursorName, ledgerId) -> {
                if (log.isDebugEnabled()) {
                    log.debug("Cursor {} Ledger {} Trying to obtain MD from BkAdmin", cursorName, ledgerId);
                }
                Position lastAckedMessagePosition = tryGetMDPosition(bk, ledgerId, cursorName);
                if (lastAckedMessagePosition == null) {
                    log.warn("[{}] Cursor {} read from ledger {}. Unable to determine cursor position",
                            managedLedgerName, cursorName, ledgerId);
                } else {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Cursor {} read from ledger using bk admin {}. position {}", managedLedgerName,
                                cursorName, ledgerId, lastAckedMessagePosition);
                    }
                    // calculate cursor backlog
                    Range<Position> range = Range.openClosed(lastAckedMessagePosition, lastLedgerPosition);
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Calculating backlog for cursor {} using range {}", managedLedgerName,
                                cursorName, range);
                    }
                    long cursorBacklog = getNumberOfEntries(range, ledgers);
                    offlineTopicStats.messageBacklog += cursorBacklog;
                    offlineTopicStats.addCursorDetails(cursorName, cursorBacklog, ledgerId);
                }
            });
        }
    }