in src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java [176:238]
public void makeGroupListCache() {
HashMap<String, List<String>> consumerGroupMap = Maps.newHashMap();
SubscriptionGroupWrapper subscriptionGroupWrapper = null;
try {
ClusterInfo clusterInfo = clusterInfoService.get();
for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
for (String groupName : subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()) {
if (!consumerGroupMap.containsKey(groupName)) {
consumerGroupMap.putIfAbsent(groupName, new ArrayList<>());
}
List<String> addresses = consumerGroupMap.get(groupName);
addresses.add(brokerData.selectBrokerAddr());
consumerGroupMap.put(groupName, addresses);
}
}
} catch (Exception err) {
Throwables.throwIfUnchecked(err);
throw new RuntimeException(err);
}
if (subscriptionGroupWrapper != null && subscriptionGroupWrapper.getSubscriptionGroupTable().isEmpty()) {
logger.warn("No subscription group information available");
isCacheBeingBuilt = false;
return;
}
final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable = subscriptionGroupWrapper.getSubscriptionGroupTable();
List<GroupConsumeInfo> groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList());
CountDownLatch countDownLatch = new CountDownLatch(consumerGroupMap.size());
for (Map.Entry<String, List<String>> entry : consumerGroupMap.entrySet()) {
String consumerGroup = entry.getKey();
executorService.submit(() -> {
try {
GroupConsumeInfo consumeInfo = queryGroup(consumerGroup, "");
consumeInfo.setAddress(entry.getValue());
if (SYSTEM_GROUP_SET.contains(consumerGroup)) {
consumeInfo.setSubGroupType("SYSTEM");
} else {
consumeInfo.setSubGroupType(subscriptionGroupTable.get(consumerGroup).isConsumeMessageOrderly() ? "FIFO" : "NORMAL");
}
consumeInfo.setGroup(consumerGroup);
consumeInfo.setUpdateTime(new Date());
groupConsumeInfoList.add(consumeInfo);
} catch (Exception e) {
logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e);
} finally {
countDownLatch.countDown();
}
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Interruption occurred while waiting for task completion", e);
}
logger.info("All consumer group query tasks have been completed");
isCacheBeingBuilt = false;
Collections.sort(groupConsumeInfoList);
cacheConsumeInfoList.clear();
cacheConsumeInfoList.addAll(groupConsumeInfoList);
}