in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java [2393:2629]
public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats,
StatsOutputStream topicStatsStream,
ClusterReplicationMetrics replStats, String namespace, boolean hydratePublishers) {
this.publishRateLimitedTimes = 0;
TopicStatsHelper topicStatsHelper = threadLocalTopicStats.get();
topicStatsHelper.reset();
replicators.forEach((region, replicator) -> replicator.updateRates());
final MutableInt producerCount = new MutableInt();
topicStatsStream.startObject(topic);
// start publisher stats
topicStatsStream.startList("publishers");
producers.values().forEach(producer -> {
producer.updateRates();
PublisherStatsImpl publisherStats = producer.getStats();
topicStatsHelper.aggMsgRateIn += publisherStats.msgRateIn;
topicStatsHelper.aggMsgThroughputIn += publisherStats.msgThroughputIn;
if (producer.isRemote()) {
topicStatsHelper.remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
} else {
// Exclude producers for replication from "publishers" and "producerCount"
producerCount.increment();
if (hydratePublishers) {
StreamingStats.writePublisherStats(topicStatsStream, publisherStats);
}
}
});
topicStatsStream.endList();
nsStats.producerCount += producerCount.intValue();
bundleStats.producerCount += producerCount.intValue();
// if publish-rate increases (eg: 0 to 1K) then pick max publish-rate and if publish-rate decreases then keep
// average rate.
lastUpdatedAvgPublishRateInMsg = topicStatsHelper.aggMsgRateIn > lastUpdatedAvgPublishRateInMsg
? topicStatsHelper.aggMsgRateIn
: (topicStatsHelper.aggMsgRateIn + lastUpdatedAvgPublishRateInMsg) / 2;
lastUpdatedAvgPublishRateInByte = topicStatsHelper.aggMsgThroughputIn > lastUpdatedAvgPublishRateInByte
? topicStatsHelper.aggMsgThroughputIn
: (topicStatsHelper.aggMsgThroughputIn + lastUpdatedAvgPublishRateInByte) / 2;
// Start replicator stats
topicStatsStream.startObject("replication");
nsStats.replicatorCount += topicStatsHelper.remotePublishersStats.size();
replicators.forEach((cluster, replicator) -> {
// Update replicator cursor state
try {
((PersistentReplicator) replicator).updateCursorState();
} catch (Exception e) {
log.warn("[{}] Failed to update cursor state ", topic, e);
}
// Update replicator stats
ReplicatorStatsImpl rStat = replicator.computeStats();
// Add incoming msg rates
PublisherStatsImpl pubStats = topicStatsHelper.remotePublishersStats.get(replicator.getRemoteCluster());
rStat.msgRateIn = pubStats != null ? pubStats.msgRateIn : 0;
rStat.msgThroughputIn = pubStats != null ? pubStats.msgThroughputIn : 0;
rStat.inboundConnection = pubStats != null ? pubStats.getAddress() : null;
rStat.inboundConnectedSince = pubStats != null ? pubStats.getConnectedSince() : null;
topicStatsHelper.aggMsgRateOut += rStat.msgRateOut;
topicStatsHelper.aggMsgThroughputOut += rStat.msgThroughputOut;
// Populate replicator specific stats here
topicStatsStream.startObject(cluster);
topicStatsStream.writePair("connected", rStat.connected);
topicStatsStream.writePair("msgRateExpired", rStat.msgRateExpired);
topicStatsStream.writePair("msgRateIn", rStat.msgRateIn);
topicStatsStream.writePair("msgRateOut", rStat.msgRateOut);
topicStatsStream.writePair("msgThroughputIn", rStat.msgThroughputIn);
topicStatsStream.writePair("msgThroughputOut", rStat.msgThroughputOut);
topicStatsStream.writePair("replicationBacklog", rStat.replicationBacklog);
topicStatsStream.writePair("replicationDelayInSeconds", rStat.replicationDelayInSeconds);
topicStatsStream.writePair("inboundConnection", rStat.inboundConnection);
topicStatsStream.writePair("inboundConnectedSince", rStat.inboundConnectedSince);
topicStatsStream.writePair("outboundConnection", rStat.outboundConnection);
topicStatsStream.writePair("outboundConnectedSince", rStat.outboundConnectedSince);
topicStatsStream.endObject();
nsStats.msgReplBacklog += rStat.replicationBacklog;
if (replStats.isMetricsEnabled()) {
String namespaceClusterKey = replStats.getKeyName(namespace, cluster);
ReplicationMetrics replicationMetrics = replStats.get(namespaceClusterKey);
boolean update = false;
if (replicationMetrics == null) {
replicationMetrics = ReplicationMetrics.get();
update = true;
}
replicationMetrics.connected += rStat.connected ? 1 : 0;
replicationMetrics.msgRateOut += rStat.msgRateOut;
replicationMetrics.msgThroughputOut += rStat.msgThroughputOut;
replicationMetrics.msgReplBacklog += rStat.replicationBacklog;
if (update) {
replStats.put(namespaceClusterKey, replicationMetrics);
}
// replication delay for a namespace is the max repl-delay among all the topics under this namespace
if (rStat.replicationDelayInSeconds > replicationMetrics.maxMsgReplDelayInSeconds) {
replicationMetrics.maxMsgReplDelayInSeconds = rStat.replicationDelayInSeconds;
}
}
});
// Close replication
topicStatsStream.endObject();
// Start subscription stats
topicStatsStream.startObject("subscriptions");
nsStats.subsCount += subscriptions.size();
subscriptions.forEach((subscriptionName, subscription) -> {
double subMsgRateOut = 0;
double subMsgThroughputOut = 0;
double subMsgRateRedeliver = 0;
double subMsgAckRate = 0;
// Start subscription name & consumers
try {
topicStatsStream.startObject(subscriptionName);
topicStatsStream.startList("consumers");
for (Consumer consumer : subscription.getConsumers()) {
++nsStats.consumerCount;
++bundleStats.consumerCount;
consumer.updateRates();
ConsumerStatsImpl consumerStats = consumer.getStats();
subMsgRateOut += consumerStats.msgRateOut;
subMsgAckRate += consumerStats.messageAckRate;
subMsgThroughputOut += consumerStats.msgThroughputOut;
subMsgRateRedeliver += consumerStats.msgRateRedeliver;
StreamingStats.writeConsumerStats(topicStatsStream, subscription.getType(), consumerStats);
}
// Close Consumer stats
topicStatsStream.endList();
// Populate subscription specific stats here
topicStatsStream.writePair("msgBacklog",
subscription.getNumberOfEntriesInBacklog(true));
topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
topicStatsStream.writePair("msgRateOut", subMsgRateOut);
topicStatsStream.writePair("messageAckRate", subMsgAckRate);
topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
topicStatsStream.writePair("numberOfEntriesSinceFirstNotAckedMessage",
subscription.getNumberOfEntriesSinceFirstNotAckedMessage());
topicStatsStream.writePair("totalNonContiguousDeletedMessagesRange",
subscription.getTotalNonContiguousDeletedMessagesRange());
topicStatsStream.writePair("type", subscription.getTypeString());
Dispatcher dispatcher0 = subscription.getDispatcher();
if (null != dispatcher0) {
topicStatsStream.writePair("filterProcessedMsgCount",
dispatcher0.getFilterProcessedMsgCount());
topicStatsStream.writePair("filterAcceptedMsgCount",
dispatcher0.getFilterAcceptedMsgCount());
topicStatsStream.writePair("filterRejectedMsgCount",
dispatcher0.getFilterRejectedMsgCount());
topicStatsStream.writePair("filterRescheduledMsgCount",
dispatcher0.getFilterRescheduledMsgCount());
}
if (Subscription.isIndividualAckMode(subscription.getType())) {
if (subscription.getDispatcher() instanceof AbstractPersistentDispatcherMultipleConsumers) {
AbstractPersistentDispatcherMultipleConsumers dispatcher =
(AbstractPersistentDispatcherMultipleConsumers) subscription.getDispatcher();
topicStatsStream.writePair("blockedSubscriptionOnUnackedMsgs",
dispatcher.isBlockedDispatcherOnUnackedMsgs());
topicStatsStream.writePair("unackedMessages",
dispatcher.getTotalUnackedMessages());
}
}
// Close consumers
topicStatsStream.endObject();
topicStatsHelper.aggMsgRateOut += subMsgRateOut;
topicStatsHelper.aggMsgThroughputOut += subMsgThroughputOut;
nsStats.msgBacklog += subscription.getNumberOfEntriesInBacklog(false);
// check stuck subscription
if (brokerService.getPulsar().getConfig().isUnblockStuckSubscriptionEnabled()) {
subscription.checkAndUnblockIfStuck();
}
} catch (Exception e) {
log.error("Got exception when creating consumer stats for subscription {}: {}", subscriptionName,
e.getMessage(), e);
}
});
// Close subscription
topicStatsStream.endObject();
// Remaining dest stats.
topicStatsHelper.averageMsgSize = topicStatsHelper.aggMsgRateIn == 0.0 ? 0.0
: (topicStatsHelper.aggMsgThroughputIn / topicStatsHelper.aggMsgRateIn);
topicStatsStream.writePair("producerCount", producerCount.intValue());
topicStatsStream.writePair("averageMsgSize", topicStatsHelper.averageMsgSize);
topicStatsStream.writePair("msgRateIn", topicStatsHelper.aggMsgRateIn);
topicStatsStream.writePair("msgRateOut", topicStatsHelper.aggMsgRateOut);
topicStatsStream.writePair("msgInCount", getMsgInCounter());
topicStatsStream.writePair("bytesInCount", getBytesInCounter());
topicStatsStream.writePair("msgOutCount", getMsgOutCounter());
topicStatsStream.writePair("bytesOutCount", getBytesOutCounter());
topicStatsStream.writePair("msgThroughputIn", topicStatsHelper.aggMsgThroughputIn);
topicStatsStream.writePair("msgThroughputOut", topicStatsHelper.aggMsgThroughputOut);
topicStatsStream.writePair("storageSize", ledger.getTotalSize());
topicStatsStream.writePair("backlogSize", ledger.getEstimatedBacklogSize());
topicStatsStream.writePair("pendingAddEntriesCount", ledger.getPendingAddEntriesCount());
topicStatsStream.writePair("filteredEntriesCount", getFilteredEntriesCount());
nsStats.msgRateIn += topicStatsHelper.aggMsgRateIn;
nsStats.msgRateOut += topicStatsHelper.aggMsgRateOut;
nsStats.msgThroughputIn += topicStatsHelper.aggMsgThroughputIn;
nsStats.msgThroughputOut += topicStatsHelper.aggMsgThroughputOut;
nsStats.storageSize += ledger.getEstimatedBacklogSize();
bundleStats.msgRateIn += topicStatsHelper.aggMsgRateIn;
bundleStats.msgRateOut += topicStatsHelper.aggMsgRateOut;
bundleStats.msgThroughputIn += topicStatsHelper.aggMsgThroughputIn;
bundleStats.msgThroughputOut += topicStatsHelper.aggMsgThroughputOut;
bundleStats.cacheSize += ledger.getCacheSize();
// Close topic object
topicStatsStream.endObject();
// add publish-latency metrics
this.addEntryLatencyStatsUsec.refresh();
NamespaceStats.add(this.addEntryLatencyStatsUsec.getBuckets(), nsStats.addLatencyBucket);
this.addEntryLatencyStatsUsec.reset();
}