public void collectProducer()

in src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java [207:252]


    public void collectProducer() {
        if (!rmqConfigure.isEnableCollect()) {
            return;
        }
        log.info("producer metric collection task starting....");
        long start = System.currentTimeMillis();
        ClusterInfo clusterInfo = null;
        try {
            clusterInfo = mqAdminExt.examineBrokerClusterInfo();
        } catch (Exception ex) {
            log.error(String.format("collectProducer exception namesrv is %s",
                    JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
            return;
        }

        if (clusterInfo == null || clusterInfo.getClusterAddrTable() == null || clusterInfo.getBrokerAddrTable() == null) {
            log.warn(String.format("collectProducer get empty cluster, namesrv is: %s", JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
            return;
        }
        for (String clusterName : clusterInfo.getClusterAddrTable().keySet()) {
            Set<String> brokerNames = clusterInfo.getClusterAddrTable().get(clusterName);
            if (brokerNames == null || brokerNames.isEmpty()) {
                log.warn(String.format("collectProducer cluster's brokers are empty, cluster=%s, name srv= %s", clusterName, JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
                continue;
            }
            for (String brokerName : brokerNames) {
                BrokerData bd = clusterInfo.getBrokerAddrTable().get(brokerName);
                ProducerTableInfo pt = null;
                try {
                    pt = mqAdminExt.getAllProducerInfo(bd.getBrokerAddrs().get(MixAll.MASTER_ID));
                } catch (Exception e) {
                    log.error(String.format("collectProducer. should not be here. cluster=%s, brokerName=%s, name srv= %s", clusterName, brokerName, JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
                }
                if (pt == null || pt.getData() == null || pt.getData().isEmpty()) {
                    log.warn(String.format("collectProducer. there are no producers in cluster=%s, brokerName=%s, name srv= %s", clusterName, brokerName, JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
                    continue;
                }
                for (String producerGroup : pt.getData().keySet()) {
                    List<ProducerInfo> list = pt.getData().get(producerGroup);
                    metricsService.getCollector().addProducerCountMetric(clusterName, brokerName, producerGroup, list == null ? -1 : list.size());
                }
            }
        }

        log.info("producer metric collection task ended....");
    }