in src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java [115:155]
private void scheduleCollectStats() {
long unixTime = System.currentTimeMillis() / 1000L;
List<EnvironmentEntity> environmentEntities = environmentsRepository.getAllEnvironments();
Map<Pair<String, String>, String> collectStatsServiceUrls = new HashMap<>();
for (EnvironmentEntity env : environmentEntities) {
String serviceUrl = checkServiceUrl(null, env.getBroker());
Map<String, Object> clusterObject =
clustersService.getClustersList(0, 0, serviceUrl, (c) -> serviceUrl);
List<HashMap<String, Object>> clusterLists = (List<HashMap<String, Object>>) clusterObject.get("data");
clusterLists.forEach((clusterMap) -> {
String cluster = (String) clusterMap.get("cluster");
Pair<String, String> envCluster = Pair.of(env.getName(), cluster);
String webServiceUrl = (String) clusterMap.get("serviceUrl");
if (webServiceUrl.contains(",")) {
String[] webServiceUrlList = webServiceUrl.split(",");
for (String url : webServiceUrlList) {
if (!url.contains("http://")) {
url = "http://" + url;
}
try {
Brokers brokers = pulsarAdminService.brokers(url);
brokers.healthcheck();
webServiceUrl = url;
break;
} catch (PulsarAdminException e) {
log.error("This service {} is down, please check", url);
}
}
}
collectStatsServiceUrls.put(envCluster, webServiceUrl);
});
}
collectStatsServiceUrls.forEach((envCluster, serviceUrl) -> {
log.info("Start collecting stats from env {} / cluster {} @ {}",
envCluster.getLeft(), envCluster.getRight(), serviceUrl);
collectStatsToDB(unixTime, envCluster.getLeft(), envCluster.getRight(), serviceUrl);
});
log.info("Start clearing stats from broker");
clearStats(unixTime, clearStatsInterval / 1000);
}