in broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java [136:211]
private void processAllGroup(Consumer<ProcessGroupInfo> consumer) {
for (Map.Entry<String, SubscriptionGroupConfig> subscriptionEntry :
subscriptionGroupManager.getSubscriptionGroupTable().entrySet()) {
String group = subscriptionEntry.getKey();
ConsumerGroupInfo consumerGroupInfo = consumerManager.getConsumerGroupInfo(group, true);
boolean isPop = false;
if (consumerGroupInfo != null) {
isPop = consumerGroupInfo.getConsumeType() == ConsumeType.CONSUME_POP;
}
Set<String> topics;
if (brokerConfig.isUseStaticSubscription()) {
SubscriptionGroupConfig subscriptionGroupConfig = subscriptionEntry.getValue();
if (subscriptionGroupConfig.getSubscriptionDataSet() == null ||
subscriptionGroupConfig.getSubscriptionDataSet().isEmpty()) {
continue;
}
topics = subscriptionGroupConfig.getSubscriptionDataSet()
.stream()
.map(SimpleSubscriptionData::getTopic)
.collect(Collectors.toSet());
} else {
if (consumerGroupInfo == null) {
continue;
}
topics = consumerGroupInfo.getSubscribeTopics();
}
if (null == topics || topics.isEmpty()) {
continue;
}
for (String topic : topics) {
// skip retry topic
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
continue;
}
TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic);
if (topicConfig == null) {
continue;
}
// skip no perm topic
int topicPerm = topicConfig.getPerm() & brokerConfig.getBrokerPermission();
if (!PermName.isReadable(topicPerm) && !PermName.isWriteable(topicPerm)) {
continue;
}
if (isPop) {
String retryTopic = KeyBuilder.buildPopRetryTopic(topic, group, brokerConfig.isEnableRetryTopicV2());
TopicConfig retryTopicConfig = topicConfigManager.selectTopicConfig(retryTopic);
if (retryTopicConfig != null) {
int retryTopicPerm = retryTopicConfig.getPerm() & brokerConfig.getBrokerPermission();
if (PermName.isReadable(retryTopicPerm) || PermName.isWriteable(retryTopicPerm)) {
consumer.accept(new ProcessGroupInfo(group, topic, true, retryTopic));
continue;
}
}
if (brokerConfig.isEnableRetryTopicV2() && brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
String retryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
TopicConfig retryTopicConfigV1 = topicConfigManager.selectTopicConfig(retryTopicV1);
if (retryTopicConfigV1 != null) {
int retryTopicPerm = retryTopicConfigV1.getPerm() & brokerConfig.getBrokerPermission();
if (PermName.isReadable(retryTopicPerm) || PermName.isWriteable(retryTopicPerm)) {
consumer.accept(new ProcessGroupInfo(group, topic, true, retryTopicV1));
continue;
}
}
}
consumer.accept(new ProcessGroupInfo(group, topic, true, null));
} else {
consumer.accept(new ProcessGroupInfo(group, topic, false, null));
}
}
}
}