private void scheduleCollectStats()

in src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java [121:161]


    private void scheduleCollectStats() {
        long unixTime = System.currentTimeMillis() / 1000L;
        List<EnvironmentEntity> environmentEntities = environmentsRepository.getAllEnvironments();
        Map<Pair<String, String>, String> collectStatsServiceUrls = new HashMap<>();
        for (EnvironmentEntity env : environmentEntities) {
            String brokerUrl = env.getBroker();
            Map<String, Object> clusterObject =
                clustersService.getClustersList(0, 0, brokerUrl, (c) -> brokerUrl);
            List<HashMap<String, Object>> clusterLists = (List<HashMap<String, Object>>) clusterObject.get("data");
            clusterLists.forEach((clusterMap) -> {
                String cluster = (String) clusterMap.get("cluster");
                Pair<String, String> envCluster = Pair.of(env.getName(), cluster);

                log.debug(envCluster.toString());

                String serviceUrlTls = (String) clusterMap.get("serviceUrlTls");
                String serviceUrl = (String) clusterMap.get("serviceUrl");

                String webServiceUrl = StringUtils.isNotBlank(serviceUrlTls) ? serviceUrlTls : serviceUrl;
                if (webServiceUrl.contains(",")) {
                    String[] webServiceUrlList = webServiceUrl.split(",");
                    for (String url : webServiceUrlList) {

                        try {
                            Brokers brokers = pulsarAdminService.brokers(url);
                            brokers.healthcheck();
                            webServiceUrl = url;
                            break;
                        } catch (PulsarAdminException e) {
                            log.error("This service {} is down, please check", url);
                        }
                    }
                }
                log.info("Start collecting stats from env {} / cluster {} @ {}", envCluster.getLeft(), envCluster.getRight(), serviceUrl);
                collectStatsToDB(unixTime, envCluster.getLeft(), envCluster.getRight(), webServiceUrl);
            });
        }

        log.info("Start clearing stats from broker");
        clearStats(unixTime, clearStatsInterval / 1000);
    }