private void collectClusterData()

in hertzbeat-collector/hertzbeat-collector-rocketmq/src/main/java/org/apache/hertzbeat/collector/collect/rocketmq/RocketmqSingleCollectImpl.java [187:248]


    private void collectClusterData(DefaultMQAdminExt mqAdminExt, RocketmqCollectData rocketmqCollectData) throws Exception {
        try {
            List<RocketmqCollectData.ClusterBrokerData> clusterBrokerDataList = new ArrayList<>();
            rocketmqCollectData.setClusterBrokerDataList(clusterBrokerDataList);

            ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
            for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {

                for (Map.Entry<Long, String> entry : brokerData.getBrokerAddrs().entrySet()) {
                    RocketmqCollectData.ClusterBrokerData clusterBrokerData = new RocketmqCollectData.ClusterBrokerData();
                    clusterBrokerDataList.add(clusterBrokerData);

                    clusterBrokerData.setBrokerId(entry.getKey());
                    clusterBrokerData.setAddress(entry.getValue());

                    KVTable kvTable = mqAdminExt.fetchBrokerRuntimeStats(entry.getValue());
                    clusterBrokerData.setVersion(kvTable.getTable().get("brokerVersionDesc"));

                    String putTps = kvTable.getTable().get("putTps");
                    if (StringUtils.isNotEmpty(putTps)) {
                        String[] putTpsArr = putTps.split(" ");
                        clusterBrokerData.setProducerMessageTps(Double.parseDouble(putTpsArr[0]));
                    }

                    String getTransferredTps = kvTable.getTable().get("getTransferedTps");
                    if (StringUtils.isNotEmpty(getTransferredTps)) {
                        String[] getTransferredTpsArr = getTransferredTps.split(" ");
                        clusterBrokerData.setConsumerMessageTps(Double.parseDouble(getTransferredTpsArr[0]));
                    }

                    String msgPutTotalTodayMorning = kvTable.getTable().get("msgPutTotalTodayMorning");
                    String msgPutTotalYesterdayMorning = kvTable.getTable().get("msgPutTotalYesterdayMorning");
                    if (StringUtils.isNotEmpty(msgPutTotalTodayMorning) && StringUtils.isNotEmpty(msgPutTotalYesterdayMorning)) {
                        long yesterdayProduceCount = Long.parseLong(msgPutTotalTodayMorning) - Long.parseLong(msgPutTotalYesterdayMorning);
                        clusterBrokerData.setYesterdayProduceCount(yesterdayProduceCount);
                    }

                    String msgGetTotalTodayMorning = kvTable.getTable().get("msgGetTotalTodayMorning");
                    String msgGetTotalYesterdayMorning = kvTable.getTable().get("msgGetTotalYesterdayMorning");
                    if (StringUtils.isNotEmpty(msgGetTotalTodayMorning) && StringUtils.isNotEmpty(msgGetTotalYesterdayMorning)) {
                        long yesterdayConsumerCount = Long.parseLong(msgGetTotalTodayMorning) - Long.parseLong(msgGetTotalYesterdayMorning);
                        clusterBrokerData.setYesterdayConsumeCount(yesterdayConsumerCount);
                    }

                    String msgPutTotalTodayNow = kvTable.getTable().get("msgPutTotalTodayNow");
                    if (StringUtils.isNotEmpty(msgPutTotalTodayNow) && StringUtils.isNotEmpty(msgPutTotalTodayMorning)) {
                        long todayProduceCount = Long.parseLong(msgPutTotalTodayNow) - Long.parseLong(msgPutTotalTodayMorning);
                        clusterBrokerData.setTodayProduceCount(todayProduceCount);
                    }

                    String msgGetTotalTodayNow = kvTable.getTable().get("msgGetTotalTodayNow");
                    if (StringUtils.isNotEmpty(msgGetTotalTodayNow) && StringUtils.isNotEmpty(msgGetTotalTodayMorning)) {
                        long todayConsumerCount = Long.parseLong(msgGetTotalTodayNow) - Long.parseLong(msgGetTotalTodayMorning);
                        clusterBrokerData.setTodayConsumeCount(todayConsumerCount);
                    }
                }
            }
        } catch (Exception e) {
            log.warn("collect rocketmq cluster data error", e);
            throw e;
        }
    }