public void makeGroupListCache()

in src/main/java/org/apache/rocketmq/dashboard/service/impl/ConsumerServiceImpl.java [176:238]


    public void makeGroupListCache() {
        HashMap<String, List<String>> consumerGroupMap = Maps.newHashMap();
        SubscriptionGroupWrapper subscriptionGroupWrapper = null;
        try {
            ClusterInfo clusterInfo = clusterInfoService.get();
            for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
                subscriptionGroupWrapper = mqAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
                for (String groupName : subscriptionGroupWrapper.getSubscriptionGroupTable().keySet()) {
                    if (!consumerGroupMap.containsKey(groupName)) {
                        consumerGroupMap.putIfAbsent(groupName, new ArrayList<>());
                    }
                    List<String> addresses = consumerGroupMap.get(groupName);
                    addresses.add(brokerData.selectBrokerAddr());
                    consumerGroupMap.put(groupName, addresses);
                }
            }
        } catch (Exception err) {
            Throwables.throwIfUnchecked(err);
            throw new RuntimeException(err);
        }

        if (subscriptionGroupWrapper != null && subscriptionGroupWrapper.getSubscriptionGroupTable().isEmpty()) {
            logger.warn("No subscription group information available");
            isCacheBeingBuilt = false;
            return;
        }
        final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable = subscriptionGroupWrapper.getSubscriptionGroupTable();
        List<GroupConsumeInfo> groupConsumeInfoList = Collections.synchronizedList(Lists.newArrayList());
        CountDownLatch countDownLatch = new CountDownLatch(consumerGroupMap.size());
        for (Map.Entry<String, List<String>> entry : consumerGroupMap.entrySet()) {
            String consumerGroup = entry.getKey();
            executorService.submit(() -> {
                try {
                    GroupConsumeInfo consumeInfo = queryGroup(consumerGroup, "");
                    consumeInfo.setAddress(entry.getValue());
                    if (SYSTEM_GROUP_SET.contains(consumerGroup)) {
                        consumeInfo.setSubGroupType("SYSTEM");
                    } else {
                        consumeInfo.setSubGroupType(subscriptionGroupTable.get(consumerGroup).isConsumeMessageOrderly() ? "FIFO" : "NORMAL");
                    }
                    consumeInfo.setGroup(consumerGroup);
                    consumeInfo.setUpdateTime(new Date());
                    groupConsumeInfoList.add(consumeInfo);
                } catch (Exception e) {
                    logger.error("queryGroup exception, consumerGroup: {}", consumerGroup, e);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.error("Interruption occurred while waiting for task completion", e);
        }
        logger.info("All consumer group query tasks have been completed");
        isCacheBeingBuilt = false;
        Collections.sort(groupConsumeInfoList);

        cacheConsumeInfoList.clear();
        cacheConsumeInfoList.addAll(groupConsumeInfoList);
    }