in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java [2813:2969]
public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean includeLedgerMetadata) {
CompletableFuture<PersistentTopicInternalStats> statFuture = new CompletableFuture<>();
ledger.getManagedLedgerInternalStats(includeLedgerMetadata)
.thenCombine(getCompactedTopicContextAsync(), (ledgerInternalStats, compactedTopicContext) -> {
PersistentTopicInternalStats stats = new PersistentTopicInternalStats();
stats.entriesAddedCounter = ledgerInternalStats.getEntriesAddedCounter();
stats.numberOfEntries = ledgerInternalStats.getNumberOfEntries();
stats.totalSize = ledgerInternalStats.getTotalSize();
stats.currentLedgerEntries = ledgerInternalStats.getCurrentLedgerEntries();
stats.currentLedgerSize = ledgerInternalStats.getCurrentLedgerSize();
stats.lastLedgerCreatedTimestamp = ledgerInternalStats.getLastLedgerCreatedTimestamp();
stats.lastLedgerCreationFailureTimestamp = ledgerInternalStats.getLastLedgerCreationFailureTimestamp();
stats.waitingCursorsCount = ledgerInternalStats.getWaitingCursorsCount();
stats.pendingAddEntriesCount = ledgerInternalStats.getPendingAddEntriesCount();
stats.lastConfirmedEntry = ledgerInternalStats.getLastConfirmedEntry();
stats.state = ledgerInternalStats.getState();
stats.ledgers = ledgerInternalStats.ledgers;
// Add ledger info for compacted topic ledger if exist.
LedgerInfo info = new LedgerInfo();
info.ledgerId = -1;
info.entries = -1;
info.size = -1;
if (compactedTopicContext != null) {
info.ledgerId = compactedTopicContext.getLedger().getId();
info.entries = compactedTopicContext.getLedger().getLastAddConfirmed() + 1;
info.size = compactedTopicContext.getLedger().getLength();
}
stats.compactedLedger = info;
stats.cursors = new HashMap<>();
ledger.getCursors().forEach(c -> {
CursorStats cs = new CursorStats();
CursorStats cursorInternalStats = c.getCursorStats();
cs.markDeletePosition = cursorInternalStats.getMarkDeletePosition();
cs.readPosition = cursorInternalStats.getReadPosition();
cs.waitingReadOp = cursorInternalStats.isWaitingReadOp();
cs.pendingReadOps = cursorInternalStats.getPendingReadOps();
cs.messagesConsumedCounter = cursorInternalStats.getMessagesConsumedCounter();
cs.cursorLedger = cursorInternalStats.getCursorLedger();
cs.cursorLedgerLastEntry = cursorInternalStats.getCursorLedgerLastEntry();
cs.individuallyDeletedMessages = cursorInternalStats.getIndividuallyDeletedMessages();
cs.lastLedgerSwitchTimestamp = cursorInternalStats.getLastLedgerSwitchTimestamp();
cs.state = cursorInternalStats.getState();
cs.active = cursorInternalStats.isActive();
cs.numberOfEntriesSinceFirstNotAckedMessage =
cursorInternalStats.getNumberOfEntriesSinceFirstNotAckedMessage();
cs.totalNonContiguousDeletedMessagesRange =
cursorInternalStats.getTotalNonContiguousDeletedMessagesRange();
cs.properties = cursorInternalStats.getProperties();
// subscription metrics
PersistentSubscription sub = subscriptions.get(Codec.decode(c.getName()));
if (sub != null) {
if (sub.getDispatcher() instanceof AbstractPersistentDispatcherMultipleConsumers) {
AbstractPersistentDispatcherMultipleConsumers dispatcher =
(AbstractPersistentDispatcherMultipleConsumers) sub.getDispatcher();
cs.subscriptionHavePendingRead = dispatcher.isHavePendingRead();
cs.subscriptionHavePendingReplayRead = dispatcher.isHavePendingReplayRead();
} else if (sub.getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) {
PersistentDispatcherSingleActiveConsumer dispatcher =
(PersistentDispatcherSingleActiveConsumer) sub.getDispatcher();
cs.subscriptionHavePendingRead = dispatcher.havePendingRead;
}
}
stats.cursors.put(c.getName(), cs);
});
//Schema store ledgers
String schemaId;
try {
schemaId = TopicName.get(topic).getSchemaName();
} catch (Throwable t) {
statFuture.completeExceptionally(t);
return null;
}
CompletableFuture<Void> schemaStoreLedgersFuture = new CompletableFuture<>();
stats.schemaLedgers = Collections.synchronizedList(new ArrayList<>());
if (brokerService.getPulsar().getSchemaStorage() != null
&& brokerService.getPulsar().getSchemaStorage() instanceof BookkeeperSchemaStorage) {
((BookkeeperSchemaStorage) brokerService.getPulsar().getSchemaStorage())
.getStoreLedgerIdsBySchemaId(schemaId)
.thenAccept(ledgers -> {
List<CompletableFuture<Void>> getLedgerMetadataFutures = new ArrayList<>();
ledgers.forEach(ledgerId -> {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
getLedgerMetadataFutures.add(completableFuture);
CompletableFuture<LedgerMetadata> metadataFuture = null;
try {
metadataFuture = brokerService.getPulsar().getBookKeeperClient()
.getLedgerMetadata(ledgerId);
} catch (NullPointerException e) {
// related to bookkeeper issue https://github.com/apache/bookkeeper/issues/2741
if (log.isDebugEnabled()) {
log.debug("{{}} Failed to get ledger metadata for the schema ledger {}",
topic, ledgerId, e);
}
}
if (metadataFuture != null) {
metadataFuture.thenAccept(metadata -> {
LedgerInfo schemaLedgerInfo = new LedgerInfo();
schemaLedgerInfo.ledgerId = metadata.getLedgerId();
schemaLedgerInfo.entries = metadata.getLastEntryId() + 1;
schemaLedgerInfo.size = metadata.getLength();
if (includeLedgerMetadata) {
info.metadata = metadata.toSafeString();
}
stats.schemaLedgers.add(schemaLedgerInfo);
completableFuture.complete(null);
}).exceptionally(e -> {
log.error("[{}] Failed to get ledger metadata for the schema ledger {}",
topic, ledgerId, e);
if ((e.getCause() instanceof BKNoSuchLedgerExistsOnMetadataServerException)
|| (e.getCause() instanceof BKNoSuchLedgerExistsException)) {
completableFuture.complete(null);
return null;
}
completableFuture.completeExceptionally(e);
return null;
});
} else {
completableFuture.complete(null);
}
});
FutureUtil.waitForAll(getLedgerMetadataFutures).thenRun(() -> {
schemaStoreLedgersFuture.complete(null);
}).exceptionally(e -> {
schemaStoreLedgersFuture.completeExceptionally(e);
return null;
});
}).exceptionally(e -> {
schemaStoreLedgersFuture.completeExceptionally(e);
return null;
});
} else {
schemaStoreLedgersFuture.complete(null);
}
schemaStoreLedgersFuture.whenComplete((r, ex) -> {
if (ex != null) {
statFuture.completeExceptionally(ex);
} else {
statFuture.complete(stats);
}
});
return null;
})
.exceptionally(ex -> {
statFuture.completeExceptionally(ex);
return null;
});
return statFuture;
}