public void collectBrokerStatsTopic()

in src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java [421:583]


    public void collectBrokerStatsTopic() {
        if (!rmqConfigure.isEnableCollect()) {
            return;
        }
        log.info("broker topic stats collection task starting....");
        long start = System.currentTimeMillis();
        Set<String> topicSet = null;
        try {
            TopicList topicList = mqAdminExt.fetchAllTopicList();
            topicSet = topicList.getTopicList();
        } catch (Exception ex) {
            log.error(String.format("collectBrokerStatsTopic-fetch topic list from namesrv error, the address is %s",
                JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
            return;
        }
        if (topicSet == null || topicSet.isEmpty()) {
            return;
        }
        ClusterInfo clusterInfo = null;
        try {
            clusterInfo = mqAdminExt.examineBrokerClusterInfo();
        } catch (Exception ex) {
            log.error(String.format("collectBrokerStatsTopic-fetch cluster info exception, the address is %s",
                JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
            return;
        }

        for (String topic : topicSet) {
            if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) || topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) {
                continue;
            }
            TopicRouteData topicRouteData = null;

            try {
                topicRouteData = mqAdminExt.examineTopicRouteInfo(topic);
            } catch (Exception ex) {
                log.error(String.format("fetch topic route error. ignore %s", topic), ex);
                continue;
            }
            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID);
                if (!StringUtils.isBlank(masterAddr)) {
                    BrokerStatsData bsd = null;
                    try {
                        //how many messages has sent for the topic
                        bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_NUMS, topic);
                        String brokerIP = clusterInfo.getBrokerAddrTable().get(bd.getBrokerName()).getBrokerAddrs().get(MixAll.MASTER_ID);
                        metricsService.getCollector().addTopicPutNumsMetric(
                            bd.getCluster(),
                            bd.getBrokerName(),
                            brokerIP,
                            topic,
                            Utils.getFixedDouble(bsd.getStatsMinute().getSum())
                        );
                    } catch (MQClientException ex) {
                        if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
                            //log.error(String.format("TOPIC_PUT_NUMS-error, topic=%s, master broker=%s, %s", topic, masterAddr, ex.getErrorMessage()));
                        } else {
                            log.error(String.format("TOPIC_PUT_NUMS-error, topic=%s, master broker=%s", topic, masterAddr), ex);
                        }
                    } catch (RemotingTimeoutException | InterruptedException | RemotingSendRequestException | RemotingConnectException ex1) {
                        log.error(String.format("TOPIC_PUT_NUMS-error, topic=%s, master broker=%s", topic, masterAddr), ex1);
                    }
                    try {
                        //how many bytes has sent for the topic
                        bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_SIZE, topic);
                        String brokerIP = clusterInfo.getBrokerAddrTable().get(bd.getBrokerName()).getBrokerAddrs().get(MixAll.MASTER_ID);
                        metricsService.getCollector().addTopicPutSizeMetric(
                            bd.getCluster(),
                            bd.getBrokerName(),
                            brokerIP,
                            topic,
                            Utils.getFixedDouble(bsd.getStatsMinute().getSum())
                        );
                    } catch (MQClientException ex) {
                        if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
                            //log.error(String.format("TOPIC_PUT_SIZE-error, topic=%s, master broker=%s, %s", topic, masterAddr, ex.getErrorMessage()));
                        } else {
                            log.error(String.format("TOPIC_PUT_SIZE-error, topic=%s, master broker=%s", topic, masterAddr), ex);
                        }
                    } catch (InterruptedException | RemotingConnectException | RemotingTimeoutException | RemotingSendRequestException ex) {
                        log.error(String.format("TOPIC_PUT_SIZE-error, topic=%s, master broker=%s", topic, masterAddr), ex);
                    }
                }
            }

            GroupList groupList = null;
            try {
                groupList = mqAdminExt.queryTopicConsumeByWho(topic);
            } catch (Exception ex) {
                //log.error(String.format("collectBrokerStatsTopic-fetch consumers for topic(%s) error, ignore this topic", topic), ex);
                continue;
            }
            if (groupList.getGroupList() == null || groupList.getGroupList().isEmpty()) {
                //log.warn(String.format("collectBrokerStatsTopic-topic's consumer is empty, %s", topic));
                continue;
            }
            for (String group : groupList.getGroupList()) {
                for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                    String masterAddr = bd.getBrokerAddrs().get(MixAll.MASTER_ID);
                    if (masterAddr != null) {
                        String statsKey = String.format("%s@%s", topic, group);
                        BrokerStatsData bsd = null;
                        try {
                            //how many messages the consumer has get for the topic
                            bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_NUMS, statsKey);
                            metricsService.getCollector().addGroupGetNumsMetric(
                                bd.getCluster(),
                                bd.getBrokerName(),
                                topic,
                                group,
                                Utils.getFixedDouble(bsd.getStatsMinute().getSum()));
                        } catch (MQClientException ex) {
                            if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
                                //log.error(String.format("GROUP_GET_NUMS-error, topic=%s, group=%s,master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
                            } else {
                                log.error(String.format("GROUP_GET_NUMS-error, topic=%s, group=%s,master broker=%s", topic, group, masterAddr), ex);
                            }
                        } catch (InterruptedException | RemotingConnectException | RemotingTimeoutException | RemotingSendRequestException ex) {
                            log.error(String.format("GROUP_GET_NUMS-error, topic=%s, group=%s,master broker=%s", topic, group, masterAddr), ex);
                        }
                        try {
                            //how many bytes the consumer has get for the topic
                            bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_SIZE, statsKey);
                            metricsService.getCollector().addGroupGetSizeMetric(
                                bd.getCluster(),
                                bd.getBrokerName(),
                                topic,
                                group,
                                Utils.getFixedDouble(bsd.getStatsMinute().getSum()));
                        } catch (MQClientException ex) {
                            if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
                                // log.error(String.format("GROUP_GET_SIZE-error, topic=%s, group=%s, master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
                            } else {
                                log.error(String.format("GROUP_GET_SIZE-error, topic=%s, group=%s, master broker=%s", topic, group, masterAddr), ex);
                            }
                        } catch (InterruptedException | RemotingConnectException | RemotingTimeoutException | RemotingSendRequestException ex) {
                            log.error(String.format("GROUP_GET_SIZE-error, topic=%s, group=%s, master broker=%s", topic, group, masterAddr), ex);
                        }
                        try {
                            ////how many re-send times the consumer did for the topic
                            bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.SNDBCK_PUT_NUMS, statsKey);
                            metricsService.getCollector().addSendBackNumsMetric(
                                bd.getCluster(),
                                bd.getBrokerName(),
                                topic,
                                group,
                                bsd.getStatsMinute().getSum());
                        } catch (MQClientException ex) {
                            if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
                                //log.error(String.format("SNDBCK_PUT_NUMS-error, topic=%s, group=%s, master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
                            } else {
                                log.error(String.format("SNDBCK_PUT_NUMS-error, topic=%s, group=%s, master broker=%s", topic, group, masterAddr), ex);
                            }
                        } catch (InterruptedException | RemotingConnectException | RemotingTimeoutException | RemotingSendRequestException ex) {
                            log.error(String.format("SNDBCK_PUT_NUMS-error, topic=%s, group=%s, master broker=%s", topic, group, masterAddr), ex);
                        }
                    }
                }
            }
        }
        log.info("broker topic stats collection task finished...." + (System.currentTimeMillis() - start));
    }