public void collectConsumerOffset()

in src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java [255:418]


    public void collectConsumerOffset() {
        if (!rmqConfigure.isEnableCollect()) {
            return;
        }
        log.info("consumer offset collection task starting....");
        long start = System.currentTimeMillis();
        TopicList topicList = null;
        try {
            topicList = mqAdminExt.fetchAllTopicList();
        } catch (Exception ex) {
            log.error(String.format("collectConsumerOffset-fetch topic list from namesrv error, the address is %s",
                JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
            return;
        }

        Set<String> topicSet = topicList.getTopicList();
        for (String topic : topicSet) {
            GroupList groupList = null;

            boolean isDLQTopic = topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX);
            if (isDLQTopic) {
                continue;
            }
            try {
                groupList = mqAdminExt.queryTopicConsumeByWho(topic);
            } catch (Exception ex) {
                //log.warn(String.format("collectConsumerOffset-topic's consumer is empty, %s", topic));
                continue;
            }

            if (groupList == null || groupList.getGroupList() == null || groupList.getGroupList().isEmpty()) {
                //log.warn(String.format("no any consumer for topic(%s), ignore this topic", topic));
                continue;
            }

            for (String group : groupList.getGroupList()) {
                ConsumeStats consumeStats = null;
                ConsumerConnection onlineConsumers = null;
                long diff = 0L, totalConsumerOffset = 0L, totalBrokerOffset = 0L;
                int countOfOnlineConsumers = 0;

                double consumeTPS = 0F;
                MessageModel messageModel = MessageModel.CLUSTERING;
                try {
                    onlineConsumers = mqAdminExt.examineConsumerConnectionInfo(group);
                    if (onlineConsumers.getMessageModel() != null) {
                        messageModel = onlineConsumers.getMessageModel();
                    }
                } catch (InterruptedException | RemotingException ex) {
                    log.error(String.format("get topic's(%s) online consumers(%s) exception", topic, group), ex);
                } catch (MQClientException ex) {
                    handleTopicNotExistException(ex.getResponseCode(), ex, topic, group);
                } catch (MQBrokerException ex) {
                    handleTopicNotExistException(ex.getResponseCode(), ex, topic, group);
                }
                if (onlineConsumers == null || onlineConsumers.getConnectionSet() == null || onlineConsumers.getConnectionSet().isEmpty()) {
                    //log.warn(String.format("no any consumer online. topic=%s, consumer group=%s. ignore this", topic, group));
                    countOfOnlineConsumers = 0;
                } else {
                    countOfOnlineConsumers = onlineConsumers.getConnectionSet().size();
                }
                {
                    String cAddrs = "", localAddrs = "";
                    if (countOfOnlineConsumers > 0) {
                        TwoTuple<String, String> addresses = buildClientAddresses(onlineConsumers.getConnectionSet());
                        cAddrs = addresses.getFirst();
                        localAddrs = addresses.getSecond();
                    }
                    metricsService.getCollector().addGroupCountMetric(group, cAddrs, localAddrs, countOfOnlineConsumers);
                }
                if (countOfOnlineConsumers > 0) {
                    collectClientMetricExecutor.submit(new ClientMetricTaskRunnable(
                        group,
                        onlineConsumers,
                        false,
                        this.mqAdminExt,
                        log,
                        this.metricsService
                    ));
                }
                try {
                    consumeStats = mqAdminExt.examineConsumeStats(group, topic);
                } catch (InterruptedException | RemotingException ex) {
                    log.error(String.format("get topic's(%s) consumer-stats(%s) exception", topic, group), ex);
                } catch (MQClientException ex) {
                    handleTopicNotExistException(ex.getResponseCode(), ex, topic, group);
                } catch (MQBrokerException ex) {
                    handleTopicNotExistException(ex.getResponseCode(), ex, topic, group);
                }
                if (consumeStats == null || consumeStats.getOffsetTable() == null || consumeStats.getOffsetTable().isEmpty()) {
                    //log.warn(String.format("no any offset for consumer(%s), topic(%s), ignore this", group, topic));
                    continue;
                }
                if (messageModel == MessageModel.CLUSTERING) {
                    diff = consumeStats.computeTotalDiff();
                    consumeTPS = consumeStats.getConsumeTps();
                    metricsService.getCollector().addGroupDiffMetric(
                        String.valueOf(countOfOnlineConsumers),
                        group,
                        topic,
                        String.valueOf(messageModel.ordinal()),
                        diff
                    );
                    //metricsService.getCollector().addGroupConsumeTPSMetric(topic, group, consumeTPS);
                }
                // get consumer broker offset
                try {
                    HashMap<String, Long> consumeOffsetMap = new HashMap<>();
                    for (Map.Entry<MessageQueue, OffsetWrapper> consumeStatusEntry : consumeStats.getOffsetTable().entrySet()) {
                        MessageQueue q = consumeStatusEntry.getKey();
                        OffsetWrapper offset = consumeStatusEntry.getValue();
                        if (consumeOffsetMap.containsKey(q.getBrokerName())) {
                            consumeOffsetMap.put(q.getBrokerName(), consumeOffsetMap.get(q.getBrokerName()) + offset.getConsumerOffset());
                        } else {
                            consumeOffsetMap.put(q.getBrokerName(), offset.getConsumerOffset());
                        }
                    }
                    for (Map.Entry<String, Long> consumeOffsetEntry : consumeOffsetMap.entrySet()) {
                        metricsService.getCollector().addGroupBrokerTotalOffsetMetric(clusterName,
                            consumeOffsetEntry.getKey(), topic, group, consumeOffsetEntry.getValue());
                    }
                } catch (Exception ex) {
                    log.warn("addGroupBrokerTotalOffsetMetric error", ex);
                }

                // get consumer latency
                if (MessageModel.CLUSTERING == messageModel) {
                    try {
                        HashMap<String, Long> consumerLatencyMap = new HashMap<>();
                        for (Map.Entry<MessageQueue, OffsetWrapper> consumeStatusEntry : consumeStats.getOffsetTable().entrySet()) {
                            MessageQueue q = consumeStatusEntry.getKey();
                            OffsetWrapper offset = consumeStatusEntry.getValue();
                            PullResult consumePullResult = ((MQAdminExtImpl) mqAdminExt).queryMsgByOffset(q, offset.getConsumerOffset());
                            long lagTime = 0;
                            if (consumePullResult != null && consumePullResult.getPullStatus() == PullStatus.FOUND) {
                                lagTime = System.currentTimeMillis() - consumePullResult.getMsgFoundList().get(0).getStoreTimestamp();
                                if (offset.getBrokerOffset() == offset.getConsumerOffset()) {
                                    lagTime = 0;
                                }
                            } else if (consumePullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL) {
                                PullResult pullResult = ((MQAdminExtImpl) mqAdminExt).queryMsgByOffset(q, consumePullResult.getMinOffset());
                                if (pullResult != null && pullResult.getPullStatus() == PullStatus.FOUND) {
                                    lagTime = System.currentTimeMillis() - pullResult.getMsgFoundList().get(0).getStoreTimestamp();
                                }
                            }
                            if (!consumerLatencyMap.containsKey(q.getBrokerName())) {
                                consumerLatencyMap.put(q.getBrokerName(), lagTime > 0 ? lagTime : 0);
                            } else if (lagTime > consumerLatencyMap.get(q.getBrokerName())) {
                                consumerLatencyMap.put(q.getBrokerName(), lagTime);
                            }
                        }
                        for (Map.Entry<String, Long> consumeLatencyEntry : consumerLatencyMap.entrySet()) {
                            metricsService.getCollector().addGroupGetLatencyByStoreTimeMetric(clusterName,
                                    consumeLatencyEntry.getKey(), topic, group, consumeLatencyEntry.getValue());
                        }

                    } catch (Exception ex) {
                        log.warn("addGroupGetLatencyByStoreTimeMetric error", ex);
                    }
                }
            }
        }
        log.info("consumer offset collection task finished...." + (System.currentTimeMillis() - start));
    }