pkg/registry/registry.go (200 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE.txt file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package registry import ( "fmt" "net/http" "sync" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/custom-metrics-apiserver/pkg/provider" "github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/client" "github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/log" "github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/scheduler" ) // Registry maintains a list of the available metrics and the associated metrics sources. // The aim of the registry is to cache the metrics lists as they can be expensive to retrieve and compute from // the various sources. type Registry struct { logger logr.Logger lock sync.RWMutex customMetrics map[provider.CustomMetricInfo]*metricClients externalMetrics map[provider.ExternalMetricInfo]*metricClients } func NewRegistry() *Registry { return &Registry{ logger: log.ForPackage("registry"), lock: sync.RWMutex{}, customMetrics: make(map[provider.CustomMetricInfo]*metricClients), externalMetrics: make(map[provider.ExternalMetricInfo]*metricClients), } } // getCustomMetricsFrom builds a set of the custom metrics currently served by a client. func (r *Registry) getCustomMetricsFrom(clientName string) map[provider.CustomMetricInfo]struct{} { cms := make(map[provider.CustomMetricInfo]struct{}) for cm, clients := range r.customMetrics { if clients == nil { // should not happen, but still want to be on the safe side continue } for _, c := range *clients { if c.GetConfiguration().Name == clientName { cm := cm cms[cm] = struct{}{} } } } return cms } // getExternalMetricsFrom builds a set of the external metrics currently served by a client. func (r *Registry) getExternalMetricsFrom(clientName string) map[provider.ExternalMetricInfo]struct{} { ems := make(map[provider.ExternalMetricInfo]struct{}) for em, clients := range r.externalMetrics { if clients == nil { // should not happen, but we want to be on the safe side continue } for _, c := range *clients { if c.GetConfiguration().Name == clientName { em := em ems[em] = struct{}{} } } } return ems } var _ scheduler.MetricListener = &Registry{} func (r *Registry) OnError(err error) {} func (r *Registry) UpdateCustomMetrics( metricClient client.Interface, cms map[provider.CustomMetricInfo]struct{}, ) { r.lock.Lock() defer r.lock.Unlock() clientName := metricClient.GetConfiguration().Name actualCustomMetrics := r.getCustomMetricsFrom(clientName) // Check custom metrics that are no longer served. removedCustomMetrics := getRemovedCustomMetrics(actualCustomMetrics, cms) for _, removedMetric := range removedCustomMetrics { // This custom metric is no longer served by the client. clients := r.customMetrics[removedMetric] if empty := clients.removeClient(clientName); empty { delete(r.customMetrics, removedMetric) } } for mInfo := range cms { var ok bool if _, ok = r.customMetrics[mInfo]; !ok { r.customMetrics[mInfo] = newMetricClients() } serviceList := r.customMetrics[mInfo] serviceList.addOrUpdateClient(metricClient) } } func (r *Registry) UpdateExternalMetrics( metricClient client.Interface, ems map[provider.ExternalMetricInfo]struct{}, ) { r.lock.Lock() defer r.lock.Unlock() clientName := metricClient.GetConfiguration().Name actualExternalMetrics := r.getExternalMetricsFrom(clientName) // Check external metrics that are no longer served. removedExternalMetrics := getRemovedExternalMetrics(actualExternalMetrics, ems) for _, removedMetric := range removedExternalMetrics { // This external metric is no longer served by the client. clients := r.externalMetrics[removedMetric] if empty := clients.removeClient(clientName); empty { delete(r.externalMetrics, removedMetric) } } for eInfo := range ems { var ok bool if _, ok = r.externalMetrics[eInfo]; !ok { r.externalMetrics[eInfo] = newMetricClients() } serviceList := r.externalMetrics[eInfo] serviceList.addOrUpdateClient(metricClient) } } func getRemovedCustomMetrics(old map[provider.CustomMetricInfo]struct{}, new map[provider.CustomMetricInfo]struct{}) []provider.CustomMetricInfo { var outdated []provider.CustomMetricInfo for info := range old { if _, ok := new[info]; !ok { outdated = append(outdated, info) } } return outdated } func getRemovedExternalMetrics(old map[provider.ExternalMetricInfo]struct{}, new map[provider.ExternalMetricInfo]struct{}) []provider.ExternalMetricInfo { var outdated []provider.ExternalMetricInfo for info := range old { if _, ok := new[info]; !ok { outdated = append(outdated, info) } } return outdated } func (r *Registry) GetCustomMetricClient(info provider.CustomMetricInfo) (client.Interface, error) { r.lock.RLock() defer r.lock.RUnlock() var metricClients *metricClients var ok bool if metricClients, ok = r.customMetrics[info]; !ok { r.logger.V(1).Info("Custom metric is not served by any metric client", "metric_name", info.Metric) return nil, &errors.StatusError{ ErrStatus: metav1.Status{ Status: metav1.StatusFailure, Code: http.StatusNotFound, Reason: metav1.StatusReasonNotFound, Message: fmt.Sprintf("custom metric %s is not served by any metric client", info.Metric), }} } metricClient, err := metricClients.getBestMetricClient() if err != nil { return nil, fmt.Errorf("no backend for custom metric: %v", info.Metric) } r.logger.V(1).Info( "Custom metric found", "metric", info.String(), "client_name", metricClient.GetConfiguration().Name, "client_host", metricClient.GetConfiguration().ClientConfig.Host, ) return metricClient, nil } func (r *Registry) GetExternalMetricClient(info provider.ExternalMetricInfo) (client.Interface, error) { r.lock.RLock() defer r.lock.RUnlock() var metricClients *metricClients var ok bool if metricClients, ok = r.externalMetrics[info]; !ok { return nil, &errors.StatusError{ ErrStatus: metav1.Status{ Status: metav1.StatusFailure, Code: http.StatusNotFound, Reason: metav1.StatusReasonNotFound, Message: fmt.Sprintf("external metric %s is not served by any metric client", info.Metric), }} } metricClient, err := metricClients.getBestMetricClient() if err != nil { return nil, fmt.Errorf("not backend for metric: %v", info.Metric) } r.logger.V(1).Info( "External metric found", "metric", info.Metric, "client_name", metricClient.GetConfiguration().Name, "client_host", metricClient.GetConfiguration().ClientConfig.Host, ) return metricClient, nil } func (r *Registry) ListAllCustomMetrics() []provider.CustomMetricInfo { r.lock.RLock() defer r.lock.RUnlock() infos := make([]provider.CustomMetricInfo, len(r.customMetrics)) count := 0 for k := range r.customMetrics { infos[count] = k count++ } r.logger.V(1).Info("Custom metrics served by the adapter", "count", len(infos)) return infos } func (r *Registry) ListAllExternalMetrics() []provider.ExternalMetricInfo { r.lock.RLock() defer r.lock.RUnlock() infos := make([]provider.ExternalMetricInfo, len(r.externalMetrics)) count := 0 for k := range r.externalMetrics { infos[count] = k count++ } r.logger.V(1).Info("External metrics served by the adapter", "count", len(infos)) return infos }