pulsaradmin/pkg/utils/load_manage_report.go (99 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package utils import ( "math" ) type LocalBrokerData struct { // URLs to satisfy contract of ServiceLookupData (used by NamespaceService). WebServiceURL string `json:"webServiceUrl"` WebServiceURLTLS string `json:"webServiceUrlTls"` PulsarServiceURL string `json:"pulsarServiceUrl"` PulsarServiceURLTLS string `json:"pulsarServiceUrlTls"` PersistentTopicsEnabled bool `json:"persistentTopicsEnabled"` NonPersistentTopicsEnabled bool `json:"nonPersistentTopicsEnabled"` // Most recently available system resource usage. CPU ResourceUsage `json:"cpu"` Memory ResourceUsage `json:"memory"` DirectMemory ResourceUsage `json:"directMemory"` BandwidthIn ResourceUsage `json:"bandwidthIn"` BandwidthOut ResourceUsage `json:"bandwidthOut"` // Message data from the most recent namespace bundle stats. MsgThroughputIn float64 `json:"msgThroughputIn"` MsgThroughputOut float64 `json:"msgThroughputOut"` MsgRateIn float64 `json:"msgRateIn"` MsgRateOut float64 `json:"msgRateOut"` // Timestamp of last update. LastUpdate int64 `json:"lastUpdate"` // The stats given in the most recent invocation of update. LastStats map[string]*NamespaceBundleStats `json:"lastStats"` NumTopics int `json:"numTopics"` NumBundles int `json:"numBundles"` NumConsumers int `json:"numConsumers"` NumProducers int `json:"numProducers"` // All bundles belonging to this broker. Bundles []string `json:"bundles"` // The bundles gained since the last invocation of update. LastBundleGains []string `json:"lastBundleGains"` // The bundles lost since the last invocation of update. LastBundleLosses []string `json:"lastBundleLosses"` // The version string that this broker is running, obtained from the Maven build artifact in the POM BrokerVersionString string `json:"brokerVersionString"` // This place-holder requires to identify correct LoadManagerReport type while deserializing LoadReportType string `json:"loadReportType"` // the external protocol data advertised by protocol handlers. Protocols map[string]string `json:"protocols"` } func NewLocalBrokerData() LocalBrokerData { lastStats := make(map[string]*NamespaceBundleStats) lastStats[""] = NewNamespaceBundleStats() return LocalBrokerData{ LastStats: lastStats, } } type NamespaceBundleStats struct { MsgRateIn float64 `json:"msgRateIn"` MsgThroughputIn float64 `json:"msgThroughputIn"` MsgRateOut float64 `json:"msgRateOut"` MsgThroughputOut float64 `json:"msgThroughputOut"` ConsumerCount int `json:"consumerCount"` ProducerCount int `json:"producerCount"` TopicsNum int64 `json:"topics"` CacheSize int64 `json:"cacheSize"` // Consider the throughput equal if difference is less than 100 KB/s ThroughputDifferenceThreshold float64 `json:"throughputDifferenceThreshold"` // Consider the msgRate equal if the difference is less than 100 MsgRateDifferenceThreshold float64 `json:"msgRateDifferenceThreshold"` // Consider the total topics/producers/consumers equal if the difference is less than 500 TopicConnectionDifferenceThreshold int64 `json:"topicConnectionDifferenceThreshold"` // Consider the cache size equal if the difference is less than 100 kb CacheSizeDifferenceThreshold int64 `json:"cacheSizeDifferenceThreshold"` } func NewNamespaceBundleStats() *NamespaceBundleStats { return &NamespaceBundleStats{ ThroughputDifferenceThreshold: 1e5, MsgRateDifferenceThreshold: 100, TopicConnectionDifferenceThreshold: 500, CacheSizeDifferenceThreshold: 100000, } } type ResourceUsage struct { Usage float64 `json:"usage"` Limit float64 `json:"limit"` } func (ru *ResourceUsage) Reset() { ru.Usage = -1 ru.Limit = -1 } func (ru *ResourceUsage) CompareTo(o *ResourceUsage) int { required := o.Limit - o.Usage available := ru.Limit - ru.Usage return compare(required, available) } func (ru *ResourceUsage) PercentUsage() float32 { var proportion float32 if ru.Limit > 0 { proportion = float32(ru.Usage) / float32(ru.Limit) } return proportion * 100 } func compare(val1, val2 float64) int { if val1 < val2 { return -1 } if val1 < val2 { return 1 } thisBits := math.Float64bits(val1) anotherBits := math.Float64bits(val2) if thisBits == anotherBits { return 0 } if thisBits < anotherBits { return -1 } return 1 }