in src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java [163:298]
public void collectStatsToDB(long unixTime, String env, String cluster, String serviceUrl) {
Map<String, Object> brokerObject = brokersService.getBrokersList(0, 0, cluster, serviceUrl);
List<HashMap<String, Object>> brokerLists = (List<HashMap<String, Object>>) brokerObject.get("data");
brokerLists.forEach((brokerMap) -> {
// returns [Broker Hostname]:[Broker non Tls port]
String broker = (String) brokerMap.get("broker");
log.info("processing broker: {}", broker);
// use web service url scheme to replace host part with broker
UriComponents serviceURI = UriComponentsBuilder.fromHttpUrl(serviceUrl).build();
UriComponentsBuilder builder = UriComponentsBuilder.newInstance()
.scheme(serviceURI.getScheme())
.host(broker.split(":")[0])
.port(serviceURI.getPort());
String finalBroker = builder.toUriString();
JsonObject result;
try {
log.info("Start collecting stats from broker {}", finalBroker);
result = pulsarAdminService.brokerStats(finalBroker, env).getTopics();
} catch(PulsarAdminException e) {
log.error("Failed to get broker metrics.", e);
return;
}
Gson gson = new Gson();
HashMap<String, HashMap<String, HashMap<String, HashMap<String, PulsarManagerTopicStats>>>> brokerStatsTopicEntity = gson.fromJson(result,
new TypeToken<HashMap<String, HashMap<String, HashMap<String, HashMap<String, PulsarManagerTopicStats>>>>>() {
}.getType());
brokerStatsTopicEntity.forEach((namespace, namespaceStats) -> {
namespaceStats.forEach((bundle, bundleStats) -> {
bundleStats.forEach((persistent, persistentStats) -> {
persistentStats.forEach((topic, topicStats) -> {
DecimalFormat df = new DecimalFormat("#.##");
TopicStatsEntity topicStatsEntity = new TopicStatsEntity();
String[] topicPath = this.parseTopic(topic);
topicStatsEntity.setEnvironment(env);
topicStatsEntity.setCluster(cluster);
topicStatsEntity.setBroker(finalBroker);
topicStatsEntity.setTenant(topicPath[0]);
topicStatsEntity.setNamespace(topicPath[1]);
topicStatsEntity.setBundle(bundle);
topicStatsEntity.setPersistent(persistent);
topicStatsEntity.setTopic(topicPath[2]);
topicStatsEntity.setMsgRateIn(Double.parseDouble(df.format(topicStats.getMsgRateIn())));
topicStatsEntity.setMsgRateOut(Double.parseDouble(df.format(topicStats.getMsgRateOut())));
topicStatsEntity.setMsgThroughputIn(Double.parseDouble(df.format(topicStats.getMsgThroughputIn())));
topicStatsEntity.setMsgThroughputOut(Double.parseDouble(df.format(topicStats.getMsgThroughputOut())));
topicStatsEntity.setAverageMsgSize(Double.parseDouble(df.format(topicStats.getAverageMsgSize())));
topicStatsEntity.setStorageSize(Double.parseDouble(df.format(topicStats.getStorageSize())));
topicStatsEntity.setSubscriptionCount(topicStats.getSubscriptions().size());
topicStatsEntity.setProducerCount(topicStats.getPublishers().size());
topicStatsEntity.setTime_stamp(unixTime);
long topicStatsId = topicsStatsRepository.save(topicStatsEntity);
if (topicStats.getSubscriptions() != null) {
topicStats.getSubscriptions().forEach((subscription, subscriptionStats) -> {
SubscriptionStatsEntity subscriptionStatsEntity = new SubscriptionStatsEntity();
subscriptionStatsEntity.setTopicStatsId(topicStatsId);
subscriptionStatsEntity.setSubscription(subscription);
subscriptionStatsEntity.setMsgRateOut(Double.parseDouble(df.format(subscriptionStats.getMsgRateOut())));
subscriptionStatsEntity.setMsgThroughputOut(Double.parseDouble(df.format(subscriptionStats.getMsgThroughputOut())));
subscriptionStatsEntity.setMsgRateRedeliver(Double.parseDouble(df.format(subscriptionStats.getMsgRateRedeliver())));
subscriptionStatsEntity.setNumberOfEntriesSinceFirstNotAckedMessage(
subscriptionStats.getNumberOfEntriesSinceFirstNotAckedMessage());
subscriptionStatsEntity.setTotalNonContiguousDeletedMessagesRange(
subscriptionStats.getTotalNonContiguousDeletedMessagesRange());
subscriptionStatsEntity.setMsgBacklog(subscriptionStats.getMsgBacklog());
subscriptionStatsEntity.setSubscriptionType(String.valueOf(subscriptionStats.getType()));
subscriptionStatsEntity.setMsgRateExpired(Double.parseDouble(df.format(subscriptionStats.getMsgRateExpired())));
subscriptionStatsEntity.setReplicated(subscriptionStats.isReplicated());
subscriptionStatsEntity.setTime_stamp(unixTime);
long subscriptionStatsId = subscriptionsStatsRepository.save(subscriptionStatsEntity);
if (subscriptionStats.getConsumers() != null) {
subscriptionStats.getConsumers().forEach((consumerStats) -> {
ConsumerStatsEntity consumerStatsEntity = new ConsumerStatsEntity();
consumerStatsEntity.setSubscriptionStatsId(subscriptionStatsId);
consumerStatsEntity.setTopicStatsId(topicStatsId);
consumerStatsEntity.setReplicationStatsId(-1);
consumerStatsEntity.setConsumer(consumerStats.getConsumerName());
consumerStatsEntity.setMsgRateOut(Double.parseDouble(df.format(consumerStats.getMsgRateOut())));
consumerStatsEntity.setMsgThroughputOut(Double.parseDouble(df.format(consumerStats.getMsgThroughputOut())));
consumerStatsEntity.setMsgRateRedeliver(Double.parseDouble(df.format(consumerStats.getMsgRateRedeliver())));
consumerStatsEntity.setAvailablePermits(consumerStats.getAvailablePermits());
consumerStatsEntity.setAddress(consumerStats.getAddress());
consumerStatsEntity.setConnectedSince(consumerStats.getConnectedSince());
consumerStatsEntity.setClientVersion(consumerStats.getClientVersion());
consumerStatsEntity.setMetadata(gson.toJson(consumerStats.getMetadata()));
consumerStatsEntity.setTime_stamp(unixTime);
consumersStatsRepository.save(consumerStatsEntity);
});
}
});
}
if (topicStats.getPublishers() != null) {
topicStats.getPublishers().forEach((producer) -> {
PublisherStatsEntity publisherStatsEntity = new PublisherStatsEntity();
publisherStatsEntity.setTopicStatsId(topicStatsId);
publisherStatsEntity.setProducerId(producer.getProducerId());
publisherStatsEntity.setProducerName(producer.getProducerName());
publisherStatsEntity.setMsgRateIn(Double.parseDouble(df.format(producer.getMsgRateIn())));
publisherStatsEntity.setMsgThroughputIn(Double.parseDouble(df.format(producer.getMsgThroughputIn())));
publisherStatsEntity.setAverageMsgSize(Double.parseDouble(df.format(producer.getAverageMsgSize())));
publisherStatsEntity.setAddress(producer.getAddress());
publisherStatsEntity.setConnectedSince(producer.getConnectedSince());
publisherStatsEntity.setClientVersion(producer.getClientVersion());
publisherStatsEntity.setMetadata(gson.toJson(producer.getMetadata()));
publisherStatsEntity.setTime_stamp(unixTime);
publishersStatsRepository.save(publisherStatsEntity);
});
}
if (topicStats.getReplication() != null) {
topicStats.getReplication().forEach((replication, replicatorStats) -> {
ReplicationStatsEntity replicationStatsEntity = new ReplicationStatsEntity();
replicationStatsEntity.setCluster(replication);
replicationStatsEntity.setTopicStatsId(topicStatsId);
replicationStatsEntity.setMsgRateIn(Double.parseDouble(df.format(replicatorStats.getMsgRateIn())));
replicationStatsEntity.setMsgThroughputIn(Double.parseDouble(df.format(replicatorStats.getMsgThroughputIn())));
replicationStatsEntity.setMsgRateOut(Double.parseDouble(df.format(replicatorStats.getMsgRateOut())));
replicationStatsEntity.setMsgThroughputOut(Double.parseDouble(df.format(replicatorStats.getMsgThroughputOut())));
replicationStatsEntity.setMsgRateExpired(Double.parseDouble(df.format(replicatorStats.getMsgRateExpired())));
replicationStatsEntity.setReplicationBacklog(replicatorStats.getReplicationBacklog());
replicationStatsEntity.setConnected(replicatorStats.isConnected());
replicationStatsEntity.setReplicationDelayInSeconds(replicatorStats.getReplicationDelayInSeconds());
replicationStatsEntity.setInboundConnection(replicatorStats.getInboundConnection());
replicationStatsEntity.setInboundConnectedSince(replicatorStats.getInboundConnectedSince());
replicationStatsEntity.setOutboundConnection(replicatorStats.getOutboundConnection());
replicationStatsEntity.setOutboundConnectedSince(replicatorStats.getOutboundConnectedSince());
replicationStatsEntity.setTime_stamp(unixTime);
replicationsStatsRepository.save(replicationStatsEntity);
});
}
});
});
});
});
});
}