in src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java [121:161]
private void scheduleCollectStats() {
long unixTime = System.currentTimeMillis() / 1000L;
List<EnvironmentEntity> environmentEntities = environmentsRepository.getAllEnvironments();
Map<Pair<String, String>, String> collectStatsServiceUrls = new HashMap<>();
for (EnvironmentEntity env : environmentEntities) {
String brokerUrl = env.getBroker();
Map<String, Object> clusterObject =
clustersService.getClustersList(0, 0, brokerUrl, (c) -> brokerUrl);
List<HashMap<String, Object>> clusterLists = (List<HashMap<String, Object>>) clusterObject.get("data");
clusterLists.forEach((clusterMap) -> {
String cluster = (String) clusterMap.get("cluster");
Pair<String, String> envCluster = Pair.of(env.getName(), cluster);
log.debug(envCluster.toString());
String serviceUrlTls = (String) clusterMap.get("serviceUrlTls");
String serviceUrl = (String) clusterMap.get("serviceUrl");
String webServiceUrl = StringUtils.isNotBlank(serviceUrlTls) ? serviceUrlTls : serviceUrl;
if (webServiceUrl.contains(",")) {
String[] webServiceUrlList = webServiceUrl.split(",");
for (String url : webServiceUrlList) {
try {
Brokers brokers = pulsarAdminService.brokers(url);
brokers.healthcheck();
webServiceUrl = url;
break;
} catch (PulsarAdminException e) {
log.error("This service {} is down, please check", url);
}
}
}
log.info("Start collecting stats from env {} / cluster {} @ {}", envCluster.getLeft(), envCluster.getRight(), serviceUrl);
collectStatsToDB(unixTime, envCluster.getLeft(), envCluster.getRight(), webServiceUrl);
});
}
log.info("Start clearing stats from broker");
clearStats(unixTime, clearStatsInterval / 1000);
}