private void scheduleCollectStats()

in src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java [115:155]


    private void scheduleCollectStats() {
        long unixTime = System.currentTimeMillis() / 1000L;
        List<EnvironmentEntity> environmentEntities = environmentsRepository.getAllEnvironments();
        Map<Pair<String, String>, String> collectStatsServiceUrls = new HashMap<>();
        for (EnvironmentEntity env : environmentEntities) {
            String serviceUrl = checkServiceUrl(null, env.getBroker());
            Map<String, Object> clusterObject =
                clustersService.getClustersList(0, 0, serviceUrl, (c) -> serviceUrl);
            List<HashMap<String, Object>> clusterLists = (List<HashMap<String, Object>>) clusterObject.get("data");
            clusterLists.forEach((clusterMap) -> {
                String cluster = (String) clusterMap.get("cluster");
                Pair<String, String> envCluster = Pair.of(env.getName(), cluster);
                String webServiceUrl = (String) clusterMap.get("serviceUrl");
                if (webServiceUrl.contains(",")) {
                    String[] webServiceUrlList = webServiceUrl.split(",");
                    for (String url : webServiceUrlList) {
                        if (!url.contains("http://")) {
                            url = "http://" + url;
                        }
                        try {
                            Brokers brokers = pulsarAdminService.brokers(url);
                            brokers.healthcheck();
                            webServiceUrl = url;
                            break;
                        } catch (PulsarAdminException e) {
                            log.error("This service {} is down, please check", url);
                        }
                    }
                }
                collectStatsServiceUrls.put(envCluster, webServiceUrl);
            });
        }
        collectStatsServiceUrls.forEach((envCluster, serviceUrl) -> {
            log.info("Start collecting stats from env {} / cluster {} @ {}",
                envCluster.getLeft(), envCluster.getRight(), serviceUrl);
            collectStatsToDB(unixTime, envCluster.getLeft(), envCluster.getRight(), serviceUrl);
        });

        log.info("Start clearing stats from broker");
        clearStats(unixTime, clearStatsInterval / 1000);
    }