in src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java [207:252]
public void collectProducer() {
if (!rmqConfigure.isEnableCollect()) {
return;
}
log.info("producer metric collection task starting....");
long start = System.currentTimeMillis();
ClusterInfo clusterInfo = null;
try {
clusterInfo = mqAdminExt.examineBrokerClusterInfo();
} catch (Exception ex) {
log.error(String.format("collectProducer exception namesrv is %s",
JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
return;
}
if (clusterInfo == null || clusterInfo.getClusterAddrTable() == null || clusterInfo.getBrokerAddrTable() == null) {
log.warn(String.format("collectProducer get empty cluster, namesrv is: %s", JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
return;
}
for (String clusterName : clusterInfo.getClusterAddrTable().keySet()) {
Set<String> brokerNames = clusterInfo.getClusterAddrTable().get(clusterName);
if (brokerNames == null || brokerNames.isEmpty()) {
log.warn(String.format("collectProducer cluster's brokers are empty, cluster=%s, name srv= %s", clusterName, JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
continue;
}
for (String brokerName : brokerNames) {
BrokerData bd = clusterInfo.getBrokerAddrTable().get(brokerName);
ProducerTableInfo pt = null;
try {
pt = mqAdminExt.getAllProducerInfo(bd.getBrokerAddrs().get(MixAll.MASTER_ID));
} catch (Exception e) {
log.error(String.format("collectProducer. should not be here. cluster=%s, brokerName=%s, name srv= %s", clusterName, brokerName, JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
}
if (pt == null || pt.getData() == null || pt.getData().isEmpty()) {
log.warn(String.format("collectProducer. there are no producers in cluster=%s, brokerName=%s, name srv= %s", clusterName, brokerName, JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
continue;
}
for (String producerGroup : pt.getData().keySet()) {
List<ProducerInfo> list = pt.getData().get(producerGroup);
metricsService.getCollector().addProducerCountMetric(clusterName, brokerName, producerGroup, list == null ? -1 : list.size());
}
}
}
log.info("producer metric collection task ended....");
}