in src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java [255:418]
public void collectConsumerOffset() {
if (!rmqConfigure.isEnableCollect()) {
return;
}
log.info("consumer offset collection task starting....");
long start = System.currentTimeMillis();
TopicList topicList = null;
try {
topicList = mqAdminExt.fetchAllTopicList();
} catch (Exception ex) {
log.error(String.format("collectConsumerOffset-fetch topic list from namesrv error, the address is %s",
JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
return;
}
Set<String> topicSet = topicList.getTopicList();
for (String topic : topicSet) {
GroupList groupList = null;
boolean isDLQTopic = topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX);
if (isDLQTopic) {
continue;
}
try {
groupList = mqAdminExt.queryTopicConsumeByWho(topic);
} catch (Exception ex) {
//log.warn(String.format("collectConsumerOffset-topic's consumer is empty, %s", topic));
continue;
}
if (groupList == null || groupList.getGroupList() == null || groupList.getGroupList().isEmpty()) {
//log.warn(String.format("no any consumer for topic(%s), ignore this topic", topic));
continue;
}
for (String group : groupList.getGroupList()) {
ConsumeStats consumeStats = null;
ConsumerConnection onlineConsumers = null;
long diff = 0L, totalConsumerOffset = 0L, totalBrokerOffset = 0L;
int countOfOnlineConsumers = 0;
double consumeTPS = 0F;
MessageModel messageModel = MessageModel.CLUSTERING;
try {
onlineConsumers = mqAdminExt.examineConsumerConnectionInfo(group);
if (onlineConsumers.getMessageModel() != null) {
messageModel = onlineConsumers.getMessageModel();
}
} catch (InterruptedException | RemotingException ex) {
log.error(String.format("get topic's(%s) online consumers(%s) exception", topic, group), ex);
} catch (MQClientException ex) {
handleTopicNotExistException(ex.getResponseCode(), ex, topic, group);
} catch (MQBrokerException ex) {
handleTopicNotExistException(ex.getResponseCode(), ex, topic, group);
}
if (onlineConsumers == null || onlineConsumers.getConnectionSet() == null || onlineConsumers.getConnectionSet().isEmpty()) {
//log.warn(String.format("no any consumer online. topic=%s, consumer group=%s. ignore this", topic, group));
countOfOnlineConsumers = 0;
} else {
countOfOnlineConsumers = onlineConsumers.getConnectionSet().size();
}
{
String cAddrs = "", localAddrs = "";
if (countOfOnlineConsumers > 0) {
TwoTuple<String, String> addresses = buildClientAddresses(onlineConsumers.getConnectionSet());
cAddrs = addresses.getFirst();
localAddrs = addresses.getSecond();
}
metricsService.getCollector().addGroupCountMetric(group, cAddrs, localAddrs, countOfOnlineConsumers);
}
if (countOfOnlineConsumers > 0) {
collectClientMetricExecutor.submit(new ClientMetricTaskRunnable(
group,
onlineConsumers,
false,
this.mqAdminExt,
log,
this.metricsService
));
}
try {
consumeStats = mqAdminExt.examineConsumeStats(group, topic);
} catch (InterruptedException | RemotingException ex) {
log.error(String.format("get topic's(%s) consumer-stats(%s) exception", topic, group), ex);
} catch (MQClientException ex) {
handleTopicNotExistException(ex.getResponseCode(), ex, topic, group);
} catch (MQBrokerException ex) {
handleTopicNotExistException(ex.getResponseCode(), ex, topic, group);
}
if (consumeStats == null || consumeStats.getOffsetTable() == null || consumeStats.getOffsetTable().isEmpty()) {
//log.warn(String.format("no any offset for consumer(%s), topic(%s), ignore this", group, topic));
continue;
}
if (messageModel == MessageModel.CLUSTERING) {
diff = consumeStats.computeTotalDiff();
consumeTPS = consumeStats.getConsumeTps();
metricsService.getCollector().addGroupDiffMetric(
String.valueOf(countOfOnlineConsumers),
group,
topic,
String.valueOf(messageModel.ordinal()),
diff
);
//metricsService.getCollector().addGroupConsumeTPSMetric(topic, group, consumeTPS);
}
// get consumer broker offset
try {
HashMap<String, Long> consumeOffsetMap = new HashMap<>();
for (Map.Entry<MessageQueue, OffsetWrapper> consumeStatusEntry : consumeStats.getOffsetTable().entrySet()) {
MessageQueue q = consumeStatusEntry.getKey();
OffsetWrapper offset = consumeStatusEntry.getValue();
if (consumeOffsetMap.containsKey(q.getBrokerName())) {
consumeOffsetMap.put(q.getBrokerName(), consumeOffsetMap.get(q.getBrokerName()) + offset.getConsumerOffset());
} else {
consumeOffsetMap.put(q.getBrokerName(), offset.getConsumerOffset());
}
}
for (Map.Entry<String, Long> consumeOffsetEntry : consumeOffsetMap.entrySet()) {
metricsService.getCollector().addGroupBrokerTotalOffsetMetric(clusterName,
consumeOffsetEntry.getKey(), topic, group, consumeOffsetEntry.getValue());
}
} catch (Exception ex) {
log.warn("addGroupBrokerTotalOffsetMetric error", ex);
}
// get consumer latency
if (MessageModel.CLUSTERING == messageModel) {
try {
HashMap<String, Long> consumerLatencyMap = new HashMap<>();
for (Map.Entry<MessageQueue, OffsetWrapper> consumeStatusEntry : consumeStats.getOffsetTable().entrySet()) {
MessageQueue q = consumeStatusEntry.getKey();
OffsetWrapper offset = consumeStatusEntry.getValue();
PullResult consumePullResult = ((MQAdminExtImpl) mqAdminExt).queryMsgByOffset(q, offset.getConsumerOffset());
long lagTime = 0;
if (consumePullResult != null && consumePullResult.getPullStatus() == PullStatus.FOUND) {
lagTime = System.currentTimeMillis() - consumePullResult.getMsgFoundList().get(0).getStoreTimestamp();
if (offset.getBrokerOffset() == offset.getConsumerOffset()) {
lagTime = 0;
}
} else if (consumePullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL) {
PullResult pullResult = ((MQAdminExtImpl) mqAdminExt).queryMsgByOffset(q, consumePullResult.getMinOffset());
if (pullResult != null && pullResult.getPullStatus() == PullStatus.FOUND) {
lagTime = System.currentTimeMillis() - pullResult.getMsgFoundList().get(0).getStoreTimestamp();
}
}
if (!consumerLatencyMap.containsKey(q.getBrokerName())) {
consumerLatencyMap.put(q.getBrokerName(), lagTime > 0 ? lagTime : 0);
} else if (lagTime > consumerLatencyMap.get(q.getBrokerName())) {
consumerLatencyMap.put(q.getBrokerName(), lagTime);
}
}
for (Map.Entry<String, Long> consumeLatencyEntry : consumerLatencyMap.entrySet()) {
metricsService.getCollector().addGroupGetLatencyByStoreTimeMetric(clusterName,
consumeLatencyEntry.getKey(), topic, group, consumeLatencyEntry.getValue());
}
} catch (Exception ex) {
log.warn("addGroupGetLatencyByStoreTimeMetric error", ex);
}
}
}
}
log.info("consumer offset collection task finished...." + (System.currentTimeMillis() - start));
}