pkg/client/elasticsearch/client.go (264 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE.txt file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package elasticsearch
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"net/http"
"os"
"sync"
"github.com/go-logr/logr"
apierr "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/metrics/pkg/apis/custom_metrics"
"k8s.io/metrics/pkg/apis/external_metrics"
"sigs.k8s.io/custom-metrics-apiserver/pkg/provider"
"sigs.k8s.io/custom-metrics-apiserver/pkg/provider/helpers"
esv8 "github.com/elastic/go-elasticsearch/v9"
"go.elastic.co/apm/module/apmelasticsearch/v2"
"go.elastic.co/apm/v2"
"github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/client"
"github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/config"
"github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/log"
"github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/tracing"
)
const (
query = `
{
"query": {
"bool": {
"must": [{
"exists": {
"field": "%s"
}
}, {
"match": {
"kubernetes.namespace": "%s"
}
}, {
"match": {
"kubernetes.pod.name": "%s"
}
}]
}
},
"size": 1,
"sort": [
{
"@timestamp": {
"order": "desc"
}
}
]
}
`
)
// MetricsClient is a wrapper around the Elasticsearch client to implement to metrics interface.
type MetricsClient struct {
*esv8.Client
metricServerCfg config.MetricServer
lock sync.RWMutex
// metrics list of the metrics currently known ny this client
metrics map[string]provider.CustomMetricInfo
// indexedMetrics is used to associate a metric name with an index and a field.
indexedMetrics map[string]MetricMetadata
// namer maintains an index of the metric aliases and their real names in the Elasticsearch cluster.
namer config.Namer
client dynamic.Interface
mapper apimeta.RESTMapper
tracer *apm.Tracer
logger logr.Logger
}
func (mc *MetricsClient) GetConfiguration() config.MetricServer {
return mc.metricServerCfg
}
func NewElasticsearchClient(
metricServerCfg config.MetricServer,
client dynamic.Interface,
mapper apimeta.RESTMapper,
tracer *apm.Tracer,
) (*MetricsClient, error) {
logger := log.ForPackage("elasticsearch")
tlsConfig, err := newTLSClientConfig(logger, metricServerCfg.ClientConfig.TLSClientConfig)
if err != nil {
return nil, err
}
transport := &http.Transport{
TLSClientConfig: tlsConfig,
}
cfg := esv8.Config{
Addresses: []string{os.ExpandEnv(metricServerCfg.ClientConfig.Host)},
Transport: apmelasticsearch.WrapRoundTripper(transport),
}
if metricServerCfg.ClientConfig.AuthenticationConfig != nil {
cfg.Username = os.ExpandEnv(metricServerCfg.ClientConfig.AuthenticationConfig.Username)
cfg.Password = os.ExpandEnv(metricServerCfg.ClientConfig.AuthenticationConfig.Password)
}
esClient, err := esv8.NewClient(cfg)
if err != nil {
return nil, err
}
return &MetricsClient{
logger: logger,
Client: esClient,
metricServerCfg: metricServerCfg,
client: client,
mapper: mapper,
tracer: tracer,
}, nil
}
var _ client.Interface = &MetricsClient{}
func (mc *MetricsClient) ListCustomMetricInfos() (map[provider.CustomMetricInfo]struct{}, error) {
if err := mc.discoverMetrics(); err != nil {
return nil, err
}
mc.lock.RLock()
defer mc.lock.RUnlock()
customMetrics := make(map[provider.CustomMetricInfo]struct{}, len(mc.metrics))
for i := range mc.metrics {
customMetrics[mc.metrics[i]] = struct{}{}
}
return customMetrics, nil
}
func (mc *MetricsClient) GetMetricByName(name types.NamespacedName, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValue, error) {
t, ctx := tracing.NewTransaction(context.TODO(), mc.tracer, "elasticsearch-provider", "GetMetricBySelector")
defer tracing.EndTransaction(t)
mc.logger.V(1).Info("GetMetricByName", "name", name, "info", info.String(), "metricSelector", metricSelector)
value, err := mc.valueFor(&ctx, info, name, labels.NewSelector(), []string{}, metricSelector)
if err != nil {
return nil, err
}
return mc.metricFor(&ctx, value, name, labels.Everything(), info, metricSelector)
}
func (mc *MetricsClient) GetMetricBySelector(namespace string, selector labels.Selector, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValueList, error) {
t, ctx := tracing.NewTransaction(context.TODO(), mc.tracer, "elasticsearch-provider", "GetMetricBySelector")
defer tracing.EndTransaction(t)
mc.logger.V(1).Info("GetMetricBySelector", "namespace", namespace, "selector", selector, "info", info.String(), "metricSelector", metricSelector)
return mc.metricsFor(&ctx, namespace, selector, info, metricSelector)
}
func (mc *MetricsClient) GetExternalMetric(
_, _ string,
_ labels.Selector,
) (*external_metrics.ExternalMetricValueList, error) {
mc.logger.Error(errors.New("not supported by Elasticsearch metrics client"), "Fail to GetExternalMetric")
return nil, nil
}
func (mc *MetricsClient) ListExternalMetrics() (map[provider.ExternalMetricInfo]struct{}, error) {
mc.logger.V(1).Info("ListAllExternalMetrics: not supported by Elasticsearch metrics client")
return nil, nil
}
func newTLSClientConfig(logger logr.Logger, config *config.TLSClientConfig) (*tls.Config, error) {
if config == nil {
// If nothing has been set just return a nil struct
logger.V(1).Info("No Elasticsearch TLS configuration provided")
return nil, nil
}
logger.V(1).Info("Loading Elasticsearch TLS configuration")
tlsConfig := &tls.Config{
InsecureSkipVerify: config.Insecure,
}
if config.CAFile != "" {
// Load CA cert
caCert, err := ioutil.ReadFile(config.CAFile)
if err != nil {
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = caCertPool
}
return tlsConfig, nil
}
// valueFor is a helper function to get just the value of a specific metric
func (mc *MetricsClient) valueFor(
ctx *context.Context,
info provider.CustomMetricInfo,
name types.NamespacedName,
originalSelector labels.Selector,
objects []string,
metricSelector labels.Selector,
) (timestampedMetric, error) {
defer tracing.Span(ctx)()
info, _, err := info.Normalized(mc.mapper)
if err != nil {
return timestampedMetric{}, err
}
mc.lock.RLock()
defer mc.lock.RUnlock()
metricName, ok := mc.namer.Get(info.Metric)
if !ok {
return timestampedMetric{}, fmt.Errorf("metric name alias for custom metric %s not found", info.Metric)
}
info.Metric = metricName
metadata, ok := mc.indexedMetrics[info.Metric]
if !ok {
return timestampedMetric{}, fmt.Errorf("no metadata for metric %s", info.Metric)
}
value, err := getMetricForPod(ctx, mc.Client, metadata, name, info, metricSelector, originalSelector, objects)
if err != nil {
return timestampedMetric{}, err
}
// TODO: handle metricSelector
/*if !metricSelector.Matches(value.labels) {
return resource.Quantity{}, provider.NewMetricNotFoundForSelectorError(info.GroupResource, info.Metric, name.Name, metricSelector)
}*/
return value, nil
}
// metricFor is a helper function which formats a value, metric, and object info into a MetricValue which can be returned by the metrics API
func (mc *MetricsClient) metricFor(
ctx *context.Context,
timeStampedMetric timestampedMetric,
name types.NamespacedName,
selector labels.Selector,
info provider.CustomMetricInfo,
metricSelector labels.Selector,
) (*custom_metrics.MetricValue, error) {
defer tracing.Span(ctx)()
objRef, err := helpers.ReferenceFor(mc.mapper, name, info)
if err != nil {
return nil, err
}
metric := &custom_metrics.MetricValue{
DescribedObject: objRef,
Metric: custom_metrics.MetricIdentifier{
Name: info.Metric,
},
Timestamp: timeStampedMetric.Timestamp,
Value: timeStampedMetric.Value,
}
if len(metricSelector.String()) > 0 {
sel, err := metav1.ParseToLabelSelector(metricSelector.String())
if err != nil {
return nil, err
}
metric.Metric.Selector = sel
}
return metric, nil
}
// metricsFor is a wrapper used by GetMetricBySelector to format several metrics which match a resource selector
func (mc *MetricsClient) metricsFor(
ctx *context.Context,
namespace string,
selector labels.Selector,
info provider.CustomMetricInfo,
metricSelector labels.Selector,
) (*custom_metrics.MetricValueList, error) {
defer tracing.Span(ctx)()
mc.logger.V(1).Info("metricsFor", "selector", selector, "metric", info.String())
names, err := helpers.ListObjectNames(mc.mapper, mc.client, namespace, selector, info)
if err != nil {
return nil, err
}
res := make([]custom_metrics.MetricValue, 0, len(names))
for _, name := range names {
namespacedName := types.NamespacedName{Name: name, Namespace: namespace}
value, err := mc.valueFor(ctx, info, namespacedName, selector, names, metricSelector)
if err != nil {
if apierr.IsNotFound(err) {
continue
}
return nil, err
}
metric, err := mc.metricFor(ctx, value, namespacedName, selector, info, metricSelector)
if err != nil {
return nil, err
}
res = append(res, *metric)
}
return &custom_metrics.MetricValueList{
Items: res,
}, nil
}