pkg/client/elasticsearch/query.go (274 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE.txt file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package elasticsearch
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"math"
"os"
"strings"
"time"
esv8 "github.com/elastic/go-elasticsearch/v9"
"github.com/elastic/go-elasticsearch/v9/esapi"
estypes "github.com/elastic/go-elasticsearch/v9/typedapi/types"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/custom-metrics-apiserver/pkg/provider"
"github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/tracing"
)
type QueryParams struct {
Metric string
Name types.NamespacedName
}
var (
// Env holds the env. variables available when the adapter is started.
Env map[string]interface{}
)
func init() {
Env = make(map[string]interface{})
for _, kv := range os.Environ() {
sep := strings.Index(kv, "=")
Env[kv[0:sep]] = kv[sep+1:]
}
}
type customQueryParams struct {
Env map[string]interface{}
Metric string
Pod string
PodSelectors map[string]string
Namespace string
// All the objects in the context of the metric query, for example other Pods for the deployments
Objects []string
}
type timestampedMetric struct {
Value resource.Quantity
Timestamp metav1.Time
}
func queryFor(params QueryParams) string {
return fmt.Sprintf(query, params.Metric, params.Name.Namespace, params.Name.Name)
}
func getMetricForPod(
ctx *context.Context,
esClient *esv8.Client,
metadata MetricMetadata,
name types.NamespacedName,
info provider.CustomMetricInfo,
metricSelector labels.Selector,
originalSelector labels.Selector,
objects []string,
) (timestampedMetric, error) {
defer tracing.Span(ctx)()
var query string
if metadata.Search != nil {
// User specified a custom query
podSelectors := make(map[string]string)
requirements, _ := originalSelector.Requirements()
for _, requirement := range requirements {
values := requirement.Values()
if len(values) == 0 {
continue
}
// Get first item in the selector
for selectorValue := range values {
podSelectors[requirement.Key()] = selectorValue
}
}
tplBuffer := bytes.Buffer{}
if err := metadata.Search.Template.Execute(&tplBuffer, customQueryParams{
Metric: info.Metric,
Pod: name.Name,
PodSelectors: podSelectors,
Namespace: name.Namespace,
Objects: objects,
Env: Env,
}); err != nil {
return timestampedMetric{}, err
}
query = tplBuffer.String()
} else {
query = queryFor(QueryParams{
Metric: info.Metric,
Name: name,
})
}
res, err := search(ctx, esClient, metadata, query)
if err != nil {
return timestampedMetric{}, err
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, err := io.ReadAll(res.Body)
if err != nil {
return timestampedMetric{}, fmt.Errorf("[%s] failed to read search response body: %w", res.Status(), err)
}
var errorResponse estypes.ElasticsearchError
if err := json.Unmarshal(bodyBytes, &errorResponse); err != nil {
return timestampedMetric{}, fmt.Errorf("[%s] failed to unmarshal search response '%s' with error %w", res.Status(), string(bodyBytes), err)
}
return timestampedMetric{}, errorResponse
}
var r map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
return timestampedMetric{}, fmt.Errorf("error parsing the response body: %s", err)
}
var value float64
var timestamp metav1.Time
if metadata.Search != nil {
iter := metadata.Search.MetricResultQuery.Run(r)
for {
v, ok := iter.Next()
if !ok {
break
}
if err, ok := v.(error); ok {
return timestampedMetric{}, err
}
if value, err = getFloat(v); err != nil {
return timestampedMetric{}, err
}
}
iter = metadata.Search.TimestampResultQuery.Run(r)
for {
v, ok := iter.Next()
if !ok {
break
}
if err, ok := v.(error); ok {
return timestampedMetric{}, err
}
if timestamp, err = getTimestamp(v); err != nil {
return timestampedMetric{}, err
}
}
} else {
// Get the result from the document.
metricDocument, err := getMetricDocument(info, name, metricSelector, r)
if err != nil {
return timestampedMetric{}, err
}
if value, err = getMetricValue(ctx, "_source."+info.Metric, metricDocument); err != nil {
return timestampedMetric{}, err
}
if timestamp, err = getTimestampFromDocument(ctx, "_source.@timestamp", metricDocument); err != nil {
return timestampedMetric{}, err
}
}
var q *resource.Quantity
if math.IsNaN(value) {
q = resource.NewQuantity(0, resource.DecimalSI)
} else {
q = resource.NewMilliQuantity(int64(value*1000.0), resource.DecimalSI)
}
return timestampedMetric{
Value: *q,
Timestamp: timestamp,
}, nil
}
func search(ctx *context.Context, esClient *esv8.Client, metadata MetricMetadata, query string) (*esapi.Response, error) {
defer tracing.Span(ctx)()
return esClient.Search(
esClient.Search.WithContext(*ctx),
esClient.Search.WithIndex(metadata.Indices...),
esClient.Search.WithBody(strings.NewReader(query)),
esClient.Search.WithTrackTotalHits(true),
esClient.Search.WithPretty(),
)
}
func getFloat(v interface{}) (float64, error) {
switch i := v.(type) {
case float64:
return i, nil
case float32:
return float64(i), nil
case int64:
return float64(i), nil
default:
return math.NaN(), fmt.Errorf("getFloat: value is of incompatible type: %v", v)
}
}
func getValue(path string, doc map[string]interface{}) (interface{}, error) {
segments := strings.Split(path, ".")
if !(len(segments) > 0) {
return 0, fmt.Errorf("no segment in path")
}
isLeaf := len(segments) == 1
root, segments := segments[0], segments[1:]
rootDoc, exists := doc[root]
if !exists {
keys := make([]string, 0, len(doc))
for k := range doc {
keys = append(keys, k)
}
return 0, fmt.Errorf("can't find leaf %s in [%s]", root, strings.Join(keys, ","))
}
if isLeaf {
// Value is expected
return rootDoc, nil
}
if innerDoc, ok := rootDoc.(map[string]interface{}); ok {
return getValue(strings.Join(segments, "."), innerDoc)
}
return 0, fmt.Errorf("not a document: %v", rootDoc)
}
func getTimestampFromDocument(ctx *context.Context, path string, doc map[string]interface{}) (metav1.Time, error) {
defer tracing.Span(ctx)()
v, err := getValue(path, doc)
if err != nil {
return metav1.Unix(0, 0), err
}
return getTimestamp(v)
}
func getTimestamp(v interface{}) (metav1.Time, error) {
if t, ok := v.(string); ok {
t, err := time.Parse(time.RFC3339, t)
if err != nil {
return metav1.Unix(0, 0), err
}
return metav1.NewTime(t), nil
}
return metav1.Unix(0, 0), fmt.Errorf("not a string: %v", v)
}
func getMetricValue(ctx *context.Context, path string, doc map[string]interface{}) (float64, error) {
defer tracing.Span(ctx)()
raw, err := getValue(path, doc)
if err != nil {
return 0, err
}
switch v := raw.(type) {
case int:
return float64(v), nil
case float64:
return v, nil
default:
return 0, fmt.Errorf("NaN: %v", v)
}
}
func getMetricDocument(
info provider.CustomMetricInfo,
name types.NamespacedName,
metricSelector labels.Selector,
doc map[string]interface{},
) (map[string]interface{}, error) {
metaHits, ok := doc["hits"]
if !ok {
return nil, provider.NewMetricNotFoundForSelectorError(info.GroupResource, info.Metric, name.Name, metricSelector)
}
hits, ok := metaHits.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("cannot convert hits: %v", metaHits)
}
docs, ok := hits["hits"]
if !ok {
return nil, provider.NewMetricNotFoundForSelectorError(info.GroupResource, info.Metric, name.Name, metricSelector)
}
documents, ok := docs.([]interface{})
if !ok {
return nil, fmt.Errorf("cannot convert docs: %v", docs)
}
if len(documents) == 0 {
return nil, provider.NewMetricNotFoundForSelectorError(info.GroupResource, info.Metric, name.Name, metricSelector)
}
document := documents[0]
result, ok := document.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("cannot convert document: %v", document)
}
return result, nil
}