public void collectStatsToDB()

in src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java [163:298]


    public void collectStatsToDB(long unixTime, String env, String cluster, String serviceUrl) {
        Map<String, Object> brokerObject = brokersService.getBrokersList(0, 0, cluster, serviceUrl);
        List<HashMap<String, Object>> brokerLists = (List<HashMap<String, Object>>) brokerObject.get("data");
        brokerLists.forEach((brokerMap) -> {
            // returns [Broker Hostname]:[Broker non Tls port]
            String broker = (String) brokerMap.get("broker");
            log.info("processing broker: {}", broker);

            // use web service url scheme to replace host part with broker
            UriComponents serviceURI = UriComponentsBuilder.fromHttpUrl(serviceUrl).build();
            UriComponentsBuilder builder = UriComponentsBuilder.newInstance()
                    .scheme(serviceURI.getScheme())
                    .host(broker.split(":")[0])
                    .port(serviceURI.getPort());
            String finalBroker = builder.toUriString();

            JsonObject result;
            try {
                log.info("Start collecting stats from broker {}", finalBroker);
                result = pulsarAdminService.brokerStats(finalBroker, env).getTopics();
            } catch(PulsarAdminException e) {
                log.error("Failed to get broker metrics.", e);
                return;
            }
            Gson gson = new Gson();
            HashMap<String, HashMap<String, HashMap<String, HashMap<String, PulsarManagerTopicStats>>>> brokerStatsTopicEntity = gson.fromJson(result,
                new TypeToken<HashMap<String, HashMap<String, HashMap<String, HashMap<String, PulsarManagerTopicStats>>>>>() {
                }.getType());
            brokerStatsTopicEntity.forEach((namespace, namespaceStats) -> {
                namespaceStats.forEach((bundle, bundleStats) -> {
                    bundleStats.forEach((persistent, persistentStats) -> {
                        persistentStats.forEach((topic, topicStats) -> {
                            DecimalFormat df = new DecimalFormat("#.##");
                            TopicStatsEntity topicStatsEntity = new TopicStatsEntity();
                            String[] topicPath = this.parseTopic(topic);
                            topicStatsEntity.setEnvironment(env);
                            topicStatsEntity.setCluster(cluster);
                            topicStatsEntity.setBroker(finalBroker);
                            topicStatsEntity.setTenant(topicPath[0]);
                            topicStatsEntity.setNamespace(topicPath[1]);
                            topicStatsEntity.setBundle(bundle);
                            topicStatsEntity.setPersistent(persistent);
                            topicStatsEntity.setTopic(topicPath[2]);
                            topicStatsEntity.setMsgRateIn(Double.parseDouble(df.format(topicStats.getMsgRateIn())));
                            topicStatsEntity.setMsgRateOut(Double.parseDouble(df.format(topicStats.getMsgRateOut())));
                            topicStatsEntity.setMsgThroughputIn(Double.parseDouble(df.format(topicStats.getMsgThroughputIn())));
                            topicStatsEntity.setMsgThroughputOut(Double.parseDouble(df.format(topicStats.getMsgThroughputOut())));
                            topicStatsEntity.setAverageMsgSize(Double.parseDouble(df.format(topicStats.getAverageMsgSize())));
                            topicStatsEntity.setStorageSize(Double.parseDouble(df.format(topicStats.getStorageSize())));
                            topicStatsEntity.setSubscriptionCount(topicStats.getSubscriptions().size());
                            topicStatsEntity.setProducerCount(topicStats.getPublishers().size());
                            topicStatsEntity.setTime_stamp(unixTime);
                            long topicStatsId = topicsStatsRepository.save(topicStatsEntity);
                            if (topicStats.getSubscriptions() != null) {
                                topicStats.getSubscriptions().forEach((subscription, subscriptionStats) -> {
                                    SubscriptionStatsEntity subscriptionStatsEntity = new SubscriptionStatsEntity();
                                    subscriptionStatsEntity.setTopicStatsId(topicStatsId);
                                    subscriptionStatsEntity.setSubscription(subscription);
                                    subscriptionStatsEntity.setMsgRateOut(Double.parseDouble(df.format(subscriptionStats.getMsgRateOut())));
                                    subscriptionStatsEntity.setMsgThroughputOut(Double.parseDouble(df.format(subscriptionStats.getMsgThroughputOut())));
                                    subscriptionStatsEntity.setMsgRateRedeliver(Double.parseDouble(df.format(subscriptionStats.getMsgRateRedeliver())));
                                    subscriptionStatsEntity.setNumberOfEntriesSinceFirstNotAckedMessage(
                                        subscriptionStats.getNumberOfEntriesSinceFirstNotAckedMessage());
                                    subscriptionStatsEntity.setTotalNonContiguousDeletedMessagesRange(
                                        subscriptionStats.getTotalNonContiguousDeletedMessagesRange());
                                    subscriptionStatsEntity.setMsgBacklog(subscriptionStats.getMsgBacklog());
                                    subscriptionStatsEntity.setSubscriptionType(String.valueOf(subscriptionStats.getType()));
                                    subscriptionStatsEntity.setMsgRateExpired(Double.parseDouble(df.format(subscriptionStats.getMsgRateExpired())));
                                    subscriptionStatsEntity.setReplicated(subscriptionStats.isReplicated());
                                    subscriptionStatsEntity.setTime_stamp(unixTime);
                                    long subscriptionStatsId = subscriptionsStatsRepository.save(subscriptionStatsEntity);
                                    if (subscriptionStats.getConsumers() != null) {
                                        subscriptionStats.getConsumers().forEach((consumerStats) -> {
                                            ConsumerStatsEntity consumerStatsEntity = new ConsumerStatsEntity();
                                            consumerStatsEntity.setSubscriptionStatsId(subscriptionStatsId);
                                            consumerStatsEntity.setTopicStatsId(topicStatsId);
                                            consumerStatsEntity.setReplicationStatsId(-1);
                                            consumerStatsEntity.setConsumer(consumerStats.getConsumerName());
                                            consumerStatsEntity.setMsgRateOut(Double.parseDouble(df.format(consumerStats.getMsgRateOut())));
                                            consumerStatsEntity.setMsgThroughputOut(Double.parseDouble(df.format(consumerStats.getMsgThroughputOut())));
                                            consumerStatsEntity.setMsgRateRedeliver(Double.parseDouble(df.format(consumerStats.getMsgRateRedeliver())));
                                            consumerStatsEntity.setAvailablePermits(consumerStats.getAvailablePermits());
                                            consumerStatsEntity.setAddress(consumerStats.getAddress());
                                            consumerStatsEntity.setConnectedSince(consumerStats.getConnectedSince());
                                            consumerStatsEntity.setClientVersion(consumerStats.getClientVersion());
                                            consumerStatsEntity.setMetadata(gson.toJson(consumerStats.getMetadata()));
                                            consumerStatsEntity.setTime_stamp(unixTime);
                                            consumersStatsRepository.save(consumerStatsEntity);
                                        });
                                    }
                                });
                            }
                            if (topicStats.getPublishers() != null) {
                                topicStats.getPublishers().forEach((producer) -> {
                                    PublisherStatsEntity publisherStatsEntity = new PublisherStatsEntity();
                                    publisherStatsEntity.setTopicStatsId(topicStatsId);
                                    publisherStatsEntity.setProducerId(producer.getProducerId());
                                    publisherStatsEntity.setProducerName(producer.getProducerName());
                                    publisherStatsEntity.setMsgRateIn(Double.parseDouble(df.format(producer.getMsgRateIn())));
                                    publisherStatsEntity.setMsgThroughputIn(Double.parseDouble(df.format(producer.getMsgThroughputIn())));
                                    publisherStatsEntity.setAverageMsgSize(Double.parseDouble(df.format(producer.getAverageMsgSize())));
                                    publisherStatsEntity.setAddress(producer.getAddress());
                                    publisherStatsEntity.setConnectedSince(producer.getConnectedSince());
                                    publisherStatsEntity.setClientVersion(producer.getClientVersion());
                                    publisherStatsEntity.setMetadata(gson.toJson(producer.getMetadata()));
                                    publisherStatsEntity.setTime_stamp(unixTime);
                                    publishersStatsRepository.save(publisherStatsEntity);
                                });
                            }
                            if (topicStats.getReplication() != null) {
                                topicStats.getReplication().forEach((replication, replicatorStats) -> {
                                    ReplicationStatsEntity replicationStatsEntity = new ReplicationStatsEntity();
                                    replicationStatsEntity.setCluster(replication);
                                    replicationStatsEntity.setTopicStatsId(topicStatsId);
                                    replicationStatsEntity.setMsgRateIn(Double.parseDouble(df.format(replicatorStats.getMsgRateIn())));
                                    replicationStatsEntity.setMsgThroughputIn(Double.parseDouble(df.format(replicatorStats.getMsgThroughputIn())));
                                    replicationStatsEntity.setMsgRateOut(Double.parseDouble(df.format(replicatorStats.getMsgRateOut())));
                                    replicationStatsEntity.setMsgThroughputOut(Double.parseDouble(df.format(replicatorStats.getMsgThroughputOut())));
                                    replicationStatsEntity.setMsgRateExpired(Double.parseDouble(df.format(replicatorStats.getMsgRateExpired())));
                                    replicationStatsEntity.setReplicationBacklog(replicatorStats.getReplicationBacklog());
                                    replicationStatsEntity.setConnected(replicatorStats.isConnected());
                                    replicationStatsEntity.setReplicationDelayInSeconds(replicatorStats.getReplicationDelayInSeconds());
                                    replicationStatsEntity.setInboundConnection(replicatorStats.getInboundConnection());
                                    replicationStatsEntity.setInboundConnectedSince(replicatorStats.getInboundConnectedSince());
                                    replicationStatsEntity.setOutboundConnection(replicatorStats.getOutboundConnection());
                                    replicationStatsEntity.setOutboundConnectedSince(replicatorStats.getOutboundConnectedSince());
                                    replicationStatsEntity.setTime_stamp(unixTime);
                                    replicationsStatsRepository.save(replicationStatsEntity);
                                });
                            }
                        });
                    });
                });
            });
        });
    }