pkg/client/custom_api/client.go (248 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE.txt file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package custom_api import ( "errors" "fmt" "strings" "sync" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/discovery" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/metrics/pkg/apis/custom_metrics" customMetricsAPI "k8s.io/metrics/pkg/apis/custom_metrics/v1beta2" "k8s.io/metrics/pkg/apis/external_metrics" externalMetricsAPI "k8s.io/metrics/pkg/apis/external_metrics/v1beta1" cmClient "k8s.io/metrics/pkg/client/custom_metrics" emClient "k8s.io/metrics/pkg/client/external_metrics" "sigs.k8s.io/custom-metrics-apiserver/pkg/provider" "github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/client" "github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/config" "github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/log" ) type metricsClientProvider struct { baseConfig *rest.Config mapper meta.RESTMapper } type metricsClient struct { logger logr.Logger metricServerCfg config.MetricServer customMetricsAvailableAPIsGetter cmClient.AvailableAPIsGetter customMetricsClient cmClient.CustomMetricsClient externalMetricsClient emClient.ExternalMetricsClient discoveryClient discovery.ServerResourcesInterface mapper meta.RESTMapper rwLock sync.RWMutex customMetricNamer, externalMetricNamer config.Namer } func (mc *metricsClient) GetConfiguration() config.MetricServer { return mc.metricServerCfg } func (mc *metricsClient) ListCustomMetricInfos() (map[provider.CustomMetricInfo]struct{}, error) { version, err := mc.customMetricsAvailableAPIsGetter.PreferredVersion() if err != nil { return nil, err } resources, err := mc.discoveryClient.ServerResourcesForGroupVersion(version.String()) if err != nil { return nil, fmt.Errorf("failed to get resource for %s: %v", customMetricsAPI.SchemeGroupVersion, err) } metricInfos := make(map[provider.CustomMetricInfo]struct{}) namer, err := config.NewNamer(mc.metricServerCfg.Rename) if err != nil { return nil, fmt.Errorf("%s: failed to create customMetricNamer: %v", mc.GetConfiguration().Name, err) } for _, r := range resources.APIResources { parts := strings.SplitN(r.Name, "/", 2) if len(parts) != 2 { mc.logger.Error(errors.New("provider returned a malformed metrics"), "Fail to list custom metrics", "provider_name", mc.metricServerCfg.Name, "metric_name", r.Name) continue } info := provider.CustomMetricInfo{ GroupResource: schema.ParseGroupResource(parts[0]), Namespaced: r.Namespaced, Metric: namer.Register(parts[1]), } metricInfos[info] = struct{}{} } mc.rwLock.Lock() defer mc.rwLock.Unlock() mc.customMetricNamer = namer return metricInfos, nil } func (mc *metricsClient) GetMetricByName(name types.NamespacedName, info provider.CustomMetricInfo, selector labels.Selector) (*custom_metrics.MetricValue, error) { mc.rwLock.Lock() defer mc.rwLock.Unlock() var object *customMetricsAPI.MetricValue var err error metricName, ok := mc.customMetricNamer.Get(info.Metric) if !ok { return nil, fmt.Errorf("metric name alias for custom metric %s not found", info.Metric) } if info.Namespaced { object, err = mc.customMetricsClient.NamespacedMetrics(name.Namespace).GetForObject( schema.GroupKind{Group: info.GroupResource.Group, Kind: info.GroupResource.Resource}, name.Name, metricName, selector, ) } else { object, err = mc.customMetricsClient.RootScopedMetrics().GetForObject( schema.GroupKind{Group: info.GroupResource.Group, Kind: info.GroupResource.Resource}, name.Name, metricName, selector, ) } if err != nil { return nil, fmt.Errorf("failed to get metric from backend: %v", err) } return &custom_metrics.MetricValue{ DescribedObject: custom_metrics.ObjectReference{ Kind: object.DescribedObject.Kind, Namespace: object.DescribedObject.Namespace, Name: object.DescribedObject.Name, APIVersion: object.DescribedObject.APIVersion, ResourceVersion: object.DescribedObject.ResourceVersion, }, Metric: custom_metrics.MetricIdentifier{ Name: object.Metric.Name, Selector: object.Metric.Selector, }, Timestamp: object.Timestamp, WindowSeconds: object.WindowSeconds, Value: object.Value, }, nil } func (mc *metricsClient) GetMetricBySelector(namespace string, selector labels.Selector, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValueList, error) { var objects *customMetricsAPI.MetricValueList var err error kind, err := mc.mapper.ResourceSingularizer(info.GroupResource.Resource) if err != nil { return nil, fmt.Errorf("failed to singularize %s: %v", info.GroupResource.Resource, err) } mc.logger.V(1).Info("GetMetricBySelector", "metric_info", info.String()) mc.rwLock.Lock() defer mc.rwLock.Unlock() metricName, ok := mc.customMetricNamer.Get(info.Metric) if !ok { return nil, fmt.Errorf("metric name alias for custom metric %s/%s not found", namespace, info.Metric) } if info.Namespaced { objects, err = mc.customMetricsClient.NamespacedMetrics(namespace).GetForObjects( schema.GroupKind{ Group: info.GroupResource.Group, Kind: kind, }, selector, metricName, metricSelector, ) } else { objects, err = mc.customMetricsClient.RootScopedMetrics().GetForObjects( schema.GroupKind{ Group: info.GroupResource.Group, Kind: kind, }, selector, metricName, metricSelector, ) } if err != nil { return nil, fmt.Errorf("failed to get metric from backend: %v", err) } values := make([]custom_metrics.MetricValue, len(objects.Items)) for i, v := range objects.Items { values[i] = custom_metrics.MetricValue{ DescribedObject: custom_metrics.ObjectReference{ Kind: v.DescribedObject.Kind, Namespace: v.DescribedObject.Namespace, Name: v.DescribedObject.Name, APIVersion: v.DescribedObject.APIVersion, ResourceVersion: v.DescribedObject.ResourceVersion, }, Metric: custom_metrics.MetricIdentifier{ Name: v.Metric.Name, Selector: v.Metric.Selector, }, Timestamp: v.Timestamp, WindowSeconds: v.WindowSeconds, Value: v.Value, } } return &custom_metrics.MetricValueList{ Items: values, }, nil } func (mc *metricsClient) ListExternalMetrics() (map[provider.ExternalMetricInfo]struct{}, error) { infos := make(map[provider.ExternalMetricInfo]struct{}) resources, err := mc.discoveryClient.ServerResourcesForGroupVersion(externalMetricsAPI.SchemeGroupVersion.String()) if err != nil { return nil, fmt.Errorf("failed to get resource for %s: %v", externalMetricsAPI.SchemeGroupVersion, err) } for _, r := range resources.APIResources { info := provider.ExternalMetricInfo{ Metric: r.Name, } infos[info] = struct{}{} } return infos, nil } func (mc *metricsClient) GetExternalMetric(name, namespace string, selector labels.Selector) (*external_metrics.ExternalMetricValueList, error) { mc.rwLock.Lock() defer mc.rwLock.Unlock() metricName, ok := mc.externalMetricNamer.Get(name) if !ok { return nil, fmt.Errorf("metric name alias for external metric %s/%s not found", namespace, name) } result, err := mc.externalMetricsClient.NamespacedMetrics(namespace).List(metricName, selector) if err != nil { return nil, fmt.Errorf("failed to get metrics for external metric %s/%s: %v", namespace, metricName, err) } valueList := &external_metrics.ExternalMetricValueList{ Items: make([]external_metrics.ExternalMetricValue, len(result.Items)), } for i, m := range result.Items { valueList.Items[i] = external_metrics.ExternalMetricValue{ TypeMeta: metav1.TypeMeta{Kind: m.Kind, APIVersion: m.APIVersion}, MetricName: m.MetricName, MetricLabels: m.MetricLabels, Timestamp: m.Timestamp, WindowSeconds: m.WindowSeconds, Value: m.Value, } } return valueList, nil } var _ client.Interface = &metricsClient{} func NewMetricApiClientProvider(baseConfig *rest.Config, mapper meta.RESTMapper) *metricsClientProvider { return &metricsClientProvider{ baseConfig: baseConfig, mapper: mapper, } } func (mcp metricsClientProvider) NewClient( client *kubernetes.Clientset, metricServerCfg config.MetricServer, ) (client.Interface, error) { restClientConfig, err := metricServerCfg.ClientConfig.NewRestClientConfig(client, mcp.baseConfig) if err != nil { return nil, fmt.Errorf("failed to generate rest restClientConfig for %s: %s", metricServerCfg.ClientConfig.Host, err) } discoveryClient, err := discovery.NewDiscoveryClientForConfig(restClientConfig) if err != nil { return nil, fmt.Errorf("failed to create discovery client: %v", err) } customMetricsAvailableAPIsGetter := cmClient.NewAvailableAPIsGetter(discoveryClient) customMetricsClient := cmClient.NewForConfig(restClientConfig, mcp.mapper, customMetricsAvailableAPIsGetter) externalMetricsClient, err := emClient.NewForConfig(restClientConfig) if err != nil { return nil, fmt.Errorf("%s: failed to create external metrics client: %v", metricServerCfg.Name, err) } return &metricsClient{ logger: log.ForPackage("custom_api"), metricServerCfg: metricServerCfg, customMetricsAvailableAPIsGetter: customMetricsAvailableAPIsGetter, customMetricsClient: customMetricsClient, externalMetricsClient: externalMetricsClient, discoveryClient: discoveryClient, mapper: mcp.mapper, }, err }