private void processAllGroup()

in broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java [136:211]


    private void processAllGroup(Consumer<ProcessGroupInfo> consumer) {
        for (Map.Entry<String, SubscriptionGroupConfig> subscriptionEntry :
            subscriptionGroupManager.getSubscriptionGroupTable().entrySet()) {

            String group = subscriptionEntry.getKey();
            ConsumerGroupInfo consumerGroupInfo = consumerManager.getConsumerGroupInfo(group, true);
            boolean isPop = false;
            if (consumerGroupInfo != null) {
                isPop = consumerGroupInfo.getConsumeType() == ConsumeType.CONSUME_POP;
            }
            Set<String> topics;
            if (brokerConfig.isUseStaticSubscription()) {
                SubscriptionGroupConfig subscriptionGroupConfig = subscriptionEntry.getValue();
                if (subscriptionGroupConfig.getSubscriptionDataSet() == null ||
                    subscriptionGroupConfig.getSubscriptionDataSet().isEmpty()) {
                    continue;
                }
                topics = subscriptionGroupConfig.getSubscriptionDataSet()
                    .stream()
                    .map(SimpleSubscriptionData::getTopic)
                    .collect(Collectors.toSet());
            } else {
                if (consumerGroupInfo == null) {
                    continue;
                }
                topics = consumerGroupInfo.getSubscribeTopics();
            }

            if (null == topics || topics.isEmpty()) {
                continue;
            }
            for (String topic : topics) {
                // skip retry topic
                if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    continue;
                }

                TopicConfig topicConfig = topicConfigManager.selectTopicConfig(topic);
                if (topicConfig == null) {
                    continue;
                }

                // skip no perm topic
                int topicPerm = topicConfig.getPerm() & brokerConfig.getBrokerPermission();
                if (!PermName.isReadable(topicPerm) && !PermName.isWriteable(topicPerm)) {
                    continue;
                }

                if (isPop) {
                    String retryTopic = KeyBuilder.buildPopRetryTopic(topic, group, brokerConfig.isEnableRetryTopicV2());
                    TopicConfig retryTopicConfig = topicConfigManager.selectTopicConfig(retryTopic);
                    if (retryTopicConfig != null) {
                        int retryTopicPerm = retryTopicConfig.getPerm() & brokerConfig.getBrokerPermission();
                        if (PermName.isReadable(retryTopicPerm) || PermName.isWriteable(retryTopicPerm)) {
                            consumer.accept(new ProcessGroupInfo(group, topic, true, retryTopic));
                            continue;
                        }
                    }
                    if (brokerConfig.isEnableRetryTopicV2() && brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
                        String retryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
                        TopicConfig retryTopicConfigV1 = topicConfigManager.selectTopicConfig(retryTopicV1);
                        if (retryTopicConfigV1 != null) {
                            int retryTopicPerm = retryTopicConfigV1.getPerm() & brokerConfig.getBrokerPermission();
                            if (PermName.isReadable(retryTopicPerm) || PermName.isWriteable(retryTopicPerm)) {
                                consumer.accept(new ProcessGroupInfo(group, topic, true, retryTopicV1));
                                continue;
                            }
                        }
                    }
                    consumer.accept(new ProcessGroupInfo(group, topic, true, null));
                } else {
                    consumer.accept(new ProcessGroupInfo(group, topic, false, null));
                }
            }
        }
    }