public void collectTopicOffset()

in src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java [143:204]


    public void collectTopicOffset() {
        if (!rmqConfigure.isEnableCollect()) {
            return;
        }
        log.info("topic offset collection task starting....");
        long start = System.currentTimeMillis();
        TopicList topicList = null;
        try {
            topicList = mqAdminExt.fetchAllTopicList();
        } catch (Exception ex) {
            log.error(String.format("collectTopicOffset-exception comes getting topic list from namesrv, address is %s",
                JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
            return;
        }
        Set<String> topicSet = topicList != null ? topicList.getTopicList() : null;
        if (topicSet == null || topicSet.isEmpty()) {
            log.error(String.format("collectTopicOffset-the topic list is empty. the namesrv address is %s",
                JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
            return;
        }

        for (String topic : topicSet) {
            TopicStatsTable topicStats = null;
            try {
                topicStats = mqAdminExt.examineTopicStats(topic);
            } catch (Exception ex) {
                log.error(String.format("collectTopicOffset-getting topic(%s) stats error. the namesrv address is %s",
                    topic,
                    JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
                continue;
            }

            Set<Map.Entry<MessageQueue, TopicOffset>> topicStatusEntries = topicStats.getOffsetTable().entrySet();
            HashMap<String, Long> brokerOffsetMap = new HashMap<>();
            HashMap<String, Long> brokerUpdateTimestampMap = new HashMap<>();

            for (Map.Entry<MessageQueue, TopicOffset> topicStatusEntry : topicStatusEntries) {
                MessageQueue q = topicStatusEntry.getKey();
                TopicOffset offset = topicStatusEntry.getValue();

                if (brokerOffsetMap.containsKey(q.getBrokerName())) {
                    brokerOffsetMap.put(q.getBrokerName(), brokerOffsetMap.get(q.getBrokerName()) + offset.getMaxOffset());
                } else {
                    brokerOffsetMap.put(q.getBrokerName(), offset.getMaxOffset());
                }

                if (brokerUpdateTimestampMap.containsKey(q.getBrokerName())) {
                    if (offset.getLastUpdateTimestamp() > brokerUpdateTimestampMap.get(q.getBrokerName())) {
                        brokerUpdateTimestampMap.put(q.getBrokerName(), offset.getLastUpdateTimestamp());
                    }
                } else {
                    brokerUpdateTimestampMap.put(q.getBrokerName(), offset.getLastUpdateTimestamp());
                }
            }
            Set<Map.Entry<String, Long>> brokerOffsetEntries = brokerOffsetMap.entrySet();
            for (Map.Entry<String, Long> brokerOffsetEntry : brokerOffsetEntries) {
                metricsService.getCollector().addTopicOffsetMetric(clusterName, brokerOffsetEntry.getKey(), topic,
                    brokerUpdateTimestampMap.get(brokerOffsetEntry.getKey()), brokerOffsetEntry.getValue());
            }
        }
        log.info("topic offset collection task finished...." + (System.currentTimeMillis() - start));
    }