pkg/trimaran/collector.go (95 lines of code) (raw):

/* Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package trimaran import ( "fmt" "sync" "time" "github.com/paypal/load-watcher/pkg/watcher" loadwatcherapi "github.com/paypal/load-watcher/pkg/watcher/api" "k8s.io/klog/v2" pluginConfig "sigs.k8s.io/scheduler-plugins/apis/config" ) const ( metricsUpdateIntervalSeconds = 30 ) // Collector : get data from load watcher, encapsulating the load watcher and its operations // // Trimaran plugins have different, potentially conflicting, objectives. Thus, it is recommended not // to enable them concurrently. As such, they are currently designed to each have its own Collector. // If a need arises in the future to enable multiple Trimaran plugins, a restructuring to have a single // Collector, serving the multiple plugins, may be beneficial for performance reasons. type Collector struct { // load watcher client client loadwatcherapi.Client // data collected by load watcher metrics watcher.WatcherMetrics // for safe access to metrics mu sync.RWMutex } // NewCollector : create an instance of a data collector func NewCollector(trimaranSpec *pluginConfig.TrimaranSpec) (*Collector, error) { if err := checkSpecs(trimaranSpec); err != nil { return nil, err } klog.V(4).InfoS("Using TrimaranSpec", "type", trimaranSpec.MetricProvider.Type, "address", trimaranSpec.MetricProvider.Address, "watcher", trimaranSpec.WatcherAddress) var client loadwatcherapi.Client if trimaranSpec.WatcherAddress != "" { client, _ = loadwatcherapi.NewServiceClient(trimaranSpec.WatcherAddress) } else { opts := watcher.MetricsProviderOpts{ Name: string(trimaranSpec.MetricProvider.Type), Address: trimaranSpec.MetricProvider.Address, AuthToken: trimaranSpec.MetricProvider.Token, InsecureSkipVerify: trimaranSpec.MetricProvider.InsecureSkipVerify, } client, _ = loadwatcherapi.NewLibraryClient(opts) } collector := &Collector{ client: client, } // populate metrics before returning err := collector.updateMetrics() if err != nil { klog.ErrorS(err, "Unable to populate metrics initially") } // start periodic updates go func() { metricsUpdaterTicker := time.NewTicker(time.Second * metricsUpdateIntervalSeconds) for range metricsUpdaterTicker.C { err = collector.updateMetrics() if err != nil { klog.ErrorS(err, "Unable to update metrics") } } }() return collector, nil } // getAllMetrics : get all metrics from watcher func (collector *Collector) getAllMetrics() *watcher.WatcherMetrics { collector.mu.RLock() metrics := collector.metrics collector.mu.RUnlock() return &metrics } // GetNodeMetrics : get metrics for a node from watcher func (collector *Collector) GetNodeMetrics(nodeName string) ([]watcher.Metric, *watcher.WatcherMetrics) { allMetrics := collector.getAllMetrics() // This happens if metrics were never populated since scheduler started if allMetrics.Data.NodeMetricsMap == nil { klog.ErrorS(nil, "Metrics not available from watcher") return nil, nil } // Check if node is new (no metrics yet) or metrics are unavailable due to 404 or 500 if _, ok := allMetrics.Data.NodeMetricsMap[nodeName]; !ok { klog.ErrorS(nil, "Unable to find metrics for node", "nodeName", nodeName) return nil, allMetrics } return allMetrics.Data.NodeMetricsMap[nodeName].Metrics, allMetrics } // checkSpecs : check trimaran specs func checkSpecs(trimaranSpec *pluginConfig.TrimaranSpec) error { if trimaranSpec.WatcherAddress == "" { metricProviderType := string(trimaranSpec.MetricProvider.Type) validMetricProviderType := metricProviderType == string(pluginConfig.KubernetesMetricsServer) || metricProviderType == string(pluginConfig.Prometheus) || metricProviderType == string(pluginConfig.SignalFx) if !validMetricProviderType { return fmt.Errorf("invalid MetricProvider.Type, got %v", trimaranSpec.MetricProvider.Type) } } return nil } // updateMetrics : request to load watcher to update all metrics func (collector *Collector) updateMetrics() error { metrics, err := collector.client.GetLatestWatcherMetrics() if err != nil { klog.ErrorS(err, "Load watcher client failed") return err } collector.mu.Lock() collector.metrics = *metrics collector.mu.Unlock() return nil }