public CompletableFuture getInternalStats()

in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java [2813:2969]


    public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean includeLedgerMetadata) {

        CompletableFuture<PersistentTopicInternalStats> statFuture = new CompletableFuture<>();

        ledger.getManagedLedgerInternalStats(includeLedgerMetadata)
            .thenCombine(getCompactedTopicContextAsync(), (ledgerInternalStats, compactedTopicContext) -> {
                PersistentTopicInternalStats stats = new PersistentTopicInternalStats();
                stats.entriesAddedCounter = ledgerInternalStats.getEntriesAddedCounter();
                stats.numberOfEntries = ledgerInternalStats.getNumberOfEntries();
                stats.totalSize = ledgerInternalStats.getTotalSize();
                stats.currentLedgerEntries = ledgerInternalStats.getCurrentLedgerEntries();
                stats.currentLedgerSize = ledgerInternalStats.getCurrentLedgerSize();
                stats.lastLedgerCreatedTimestamp = ledgerInternalStats.getLastLedgerCreatedTimestamp();
                stats.lastLedgerCreationFailureTimestamp = ledgerInternalStats.getLastLedgerCreationFailureTimestamp();
                stats.waitingCursorsCount = ledgerInternalStats.getWaitingCursorsCount();
                stats.pendingAddEntriesCount = ledgerInternalStats.getPendingAddEntriesCount();
                stats.lastConfirmedEntry = ledgerInternalStats.getLastConfirmedEntry();
                stats.state = ledgerInternalStats.getState();
                stats.ledgers = ledgerInternalStats.ledgers;

                // Add ledger info for compacted topic ledger if exist.
                LedgerInfo info = new LedgerInfo();
                info.ledgerId = -1;
                info.entries = -1;
                info.size = -1;
                if (compactedTopicContext != null) {
                    info.ledgerId = compactedTopicContext.getLedger().getId();
                    info.entries = compactedTopicContext.getLedger().getLastAddConfirmed() + 1;
                    info.size = compactedTopicContext.getLedger().getLength();
                }

                stats.compactedLedger = info;

                stats.cursors = new HashMap<>();
                ledger.getCursors().forEach(c -> {
                    CursorStats cs = new CursorStats();

                    CursorStats cursorInternalStats = c.getCursorStats();
                    cs.markDeletePosition = cursorInternalStats.getMarkDeletePosition();
                    cs.readPosition = cursorInternalStats.getReadPosition();
                    cs.waitingReadOp = cursorInternalStats.isWaitingReadOp();
                    cs.pendingReadOps = cursorInternalStats.getPendingReadOps();
                    cs.messagesConsumedCounter = cursorInternalStats.getMessagesConsumedCounter();
                    cs.cursorLedger = cursorInternalStats.getCursorLedger();
                    cs.cursorLedgerLastEntry = cursorInternalStats.getCursorLedgerLastEntry();
                    cs.individuallyDeletedMessages = cursorInternalStats.getIndividuallyDeletedMessages();
                    cs.lastLedgerSwitchTimestamp = cursorInternalStats.getLastLedgerSwitchTimestamp();
                    cs.state = cursorInternalStats.getState();
                    cs.active = cursorInternalStats.isActive();
                    cs.numberOfEntriesSinceFirstNotAckedMessage =
                        cursorInternalStats.getNumberOfEntriesSinceFirstNotAckedMessage();
                    cs.totalNonContiguousDeletedMessagesRange =
                        cursorInternalStats.getTotalNonContiguousDeletedMessagesRange();
                    cs.properties = cursorInternalStats.getProperties();
                    // subscription metrics
                    PersistentSubscription sub = subscriptions.get(Codec.decode(c.getName()));
                    if (sub != null) {
                        if (sub.getDispatcher() instanceof AbstractPersistentDispatcherMultipleConsumers) {
                            AbstractPersistentDispatcherMultipleConsumers dispatcher =
                                (AbstractPersistentDispatcherMultipleConsumers) sub.getDispatcher();
                            cs.subscriptionHavePendingRead = dispatcher.isHavePendingRead();
                            cs.subscriptionHavePendingReplayRead = dispatcher.isHavePendingReplayRead();
                        } else if (sub.getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) {
                            PersistentDispatcherSingleActiveConsumer dispatcher =
                                (PersistentDispatcherSingleActiveConsumer) sub.getDispatcher();
                            cs.subscriptionHavePendingRead = dispatcher.havePendingRead;
                        }
                    }
                    stats.cursors.put(c.getName(), cs);
                });

                //Schema store ledgers
                String schemaId;
                try {
                    schemaId = TopicName.get(topic).getSchemaName();
                } catch (Throwable t) {
                    statFuture.completeExceptionally(t);
                    return null;
                }


                CompletableFuture<Void> schemaStoreLedgersFuture = new CompletableFuture<>();
                stats.schemaLedgers = Collections.synchronizedList(new ArrayList<>());
                if (brokerService.getPulsar().getSchemaStorage() != null
                    && brokerService.getPulsar().getSchemaStorage() instanceof BookkeeperSchemaStorage) {
                    ((BookkeeperSchemaStorage) brokerService.getPulsar().getSchemaStorage())
                        .getStoreLedgerIdsBySchemaId(schemaId)
                        .thenAccept(ledgers -> {
                            List<CompletableFuture<Void>> getLedgerMetadataFutures = new ArrayList<>();
                            ledgers.forEach(ledgerId -> {
                                CompletableFuture<Void> completableFuture = new CompletableFuture<>();
                                getLedgerMetadataFutures.add(completableFuture);
                                CompletableFuture<LedgerMetadata> metadataFuture = null;
                                try {
                                    metadataFuture = brokerService.getPulsar().getBookKeeperClient()
                                        .getLedgerMetadata(ledgerId);
                                } catch (NullPointerException e) {
                                    // related to bookkeeper issue https://github.com/apache/bookkeeper/issues/2741
                                    if (log.isDebugEnabled()) {
                                        log.debug("{{}} Failed to get ledger metadata for the schema ledger {}",
                                            topic, ledgerId, e);
                                    }
                                }
                                if (metadataFuture != null) {
                                    metadataFuture.thenAccept(metadata -> {
                                        LedgerInfo schemaLedgerInfo = new LedgerInfo();
                                        schemaLedgerInfo.ledgerId = metadata.getLedgerId();
                                        schemaLedgerInfo.entries = metadata.getLastEntryId() + 1;
                                        schemaLedgerInfo.size = metadata.getLength();
                                        if (includeLedgerMetadata) {
                                            info.metadata = metadata.toSafeString();
                                        }
                                        stats.schemaLedgers.add(schemaLedgerInfo);
                                        completableFuture.complete(null);
                                    }).exceptionally(e -> {
                                        log.error("[{}] Failed to get ledger metadata for the schema ledger {}",
                                            topic, ledgerId, e);
                                        if ((e.getCause() instanceof BKNoSuchLedgerExistsOnMetadataServerException)
                                            || (e.getCause() instanceof BKNoSuchLedgerExistsException)) {
                                            completableFuture.complete(null);
                                            return null;
                                        }
                                        completableFuture.completeExceptionally(e);
                                        return null;
                                    });
                                } else {
                                    completableFuture.complete(null);
                                }
                            });
                            FutureUtil.waitForAll(getLedgerMetadataFutures).thenRun(() -> {
                                schemaStoreLedgersFuture.complete(null);
                            }).exceptionally(e -> {
                                schemaStoreLedgersFuture.completeExceptionally(e);
                                return null;
                            });
                        }).exceptionally(e -> {
                            schemaStoreLedgersFuture.completeExceptionally(e);
                            return null;
                        });
                } else {
                    schemaStoreLedgersFuture.complete(null);
                }
                schemaStoreLedgersFuture.whenComplete((r, ex) -> {
                    if (ex != null) {
                        statFuture.completeExceptionally(ex);
                    } else {
                        statFuture.complete(stats);
                    }
                });
                return null;
            })
            .exceptionally(ex -> {
                statFuture.completeExceptionally(ex);
                return null;
            });
        return statFuture;
    }