pkg/client/elasticsearch/discovery.go (192 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE.txt file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package elasticsearch
import (
"context"
"encoding/json"
"fmt"
"strings"
"text/template"
"github.com/go-logr/logr"
"github.com/itchyny/gojq"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/custom-metrics-apiserver/pkg/provider"
esv8 "github.com/elastic/go-elasticsearch/v9"
"github.com/elastic/go-elasticsearch/v9/esapi"
"github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/config"
)
var allowedTypes = map[string]struct{}{
"byte": {},
"double": {},
"float": {},
"half_float": {},
"integer": {},
"long": {},
"scaled_float": {},
"short": {},
"unsigned_long": {},
}
func isTypeAllowed(t string) bool {
_, ok := allowedTypes[t]
return ok
}
type MetricMetadata struct {
Fields config.Fields
Search *config.Search
Indices []string
MetricsProvider provider.MetricsProvider
}
// discoverMetrics attempts to create a list of the available metrics and maintains an internal state.
func (mc *MetricsClient) discoverMetrics() error {
namer, err := config.NewNamer(mc.GetConfiguration().Rename)
if err != nil {
return fmt.Errorf("%s: failed to create namer: %v", mc.GetConfiguration().Name, err)
}
metricRecorder := newRecorder(namer)
// We first record static fields, they do not require to read the mapping
for _, metricSet := range mc.metricServerCfg.MetricSets {
for _, field := range metricSet.Fields {
if len(field.Name) > 0 {
search := field.Search
search.Template = template.Must(template.New("").Parse(search.Body))
metricResultQuery, err := gojq.Parse(search.MetricPath)
if err != nil {
return fmt.Errorf("error while parsing metricResultQuery for field %s: error: %v", field.Name, err)
}
search.MetricResultQuery = metricResultQuery
timestampResultQuery, err := gojq.Parse(search.TimestampPath)
if err != nil {
return fmt.Errorf("error while parsing timestampResultQuery for field %s: error: %v", field.Name, err)
}
search.TimestampResultQuery = timestampResultQuery
// This is a static field, save the request body and the metric path
metricRecorder.indexedMetrics[field.Name] = MetricMetadata{
Search: &search,
Indices: metricSet.Indices,
}
metricRecorder.metrics[field.Name] = provider.CustomMetricInfo{
GroupResource: schema.GroupResource{ // TODO: infer resource from configuration
Group: "",
Resource: "pods",
},
Namespaced: true,
Metric: field.Name,
}
}
}
}
for _, metricSet := range mc.metricServerCfg.MetricSets {
if err := getMappingFor(mc.logger, metricSet, mc.Client, metricRecorder); err != nil {
return err
}
}
mc.lock.Lock()
defer mc.lock.Unlock()
mc.metrics = metricRecorder.metrics
mc.indexedMetrics = metricRecorder.indexedMetrics
mc.namer = namer
return nil
}
func getMappingFor(logger logr.Logger, metricSet config.MetricSet, esClient *esv8.Client, recorder *recorder) error {
req := esapi.IndicesGetMappingRequest{Index: metricSet.Indices}
res, err := req.Do(context.Background(), esClient)
if err != nil {
return fmt.Errorf("discovery error, got response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("[%s] Error getting index mapping %v", res.Status(), metricSet.Indices)
} else {
// Deserialize the response into a map.
var r map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
return fmt.Errorf("error parsing the response body: %s", err)
} else {
if len(r) == 0 {
logger.Info("Mapping is empty", "index_pattern", strings.Join(metricSet.Indices, ","))
return nil
}
// Process mapping
for _, indexMapping := range r {
m := indexMapping.(map[string]interface{})
mapping, hasMapping := m["mappings"]
if !hasMapping {
return fmt.Errorf("discovery error: no 'mapping' field in %s", metricSet.Indices)
}
recorder.processMappingDocument(mapping, metricSet.Fields, metricSet.Indices)
}
}
}
return nil
}
func (r *recorder) processMappingDocument(mapping interface{}, fields config.FieldsSet, indices []string) {
tm, ok := mapping.(map[string]interface{})
if !ok {
return
}
rp := tm["properties"]
rpm, ok := rp.(map[string]interface{})
if !ok {
return
}
r._processMappingDocument("", rpm, fields, indices)
}
func newRecorder(namer config.Namer) *recorder {
return &recorder{
metrics: make(map[string]provider.CustomMetricInfo),
indexedMetrics: make(map[string]MetricMetadata),
namer: namer,
}
}
type recorder struct {
metrics map[string]provider.CustomMetricInfo
indexedMetrics map[string]MetricMetadata
namer config.Namer
}
func (r *recorder) _processMappingDocument(root string, d map[string]interface{}, fieldsSet config.FieldsSet, indices []string) {
for k, t := range d {
if k == "*" {
continue
}
if k == "properties" {
tm, ok := t.(map[string]interface{})
if !ok {
continue
}
r._processMappingDocument(root, tm, fieldsSet, indices)
} else {
// Is there a properties child ?
child, ok := t.(map[string]interface{})
if !ok {
continue
}
if _, hasProperties := child["properties"]; hasProperties {
var newRoot string
if root == "" {
newRoot = k
} else {
newRoot = fmt.Sprintf("%s.%s", root, k)
}
r._processMappingDocument(newRoot, child, fieldsSet, indices)
} else {
// Ensure that we have a type
if t, hasType := child["type"]; !(hasType && isTypeAllowed(t.(string))) {
continue
}
metricName := ""
// New metric
if root == "" {
metricName = k
} else {
metricName = fmt.Sprintf("%s.%s", root, k)
}
fields := fieldsSet.FindMetadata(metricName)
if fields == nil {
// field does not match a pattern, do not register it as available
continue
}
r.metrics[metricName] = provider.CustomMetricInfo{
GroupResource: schema.GroupResource{ // TODO: infer resource from configuration
Group: "",
Resource: "pods",
},
Namespaced: true,
Metric: r.namer.Register(metricName),
}
r.indexedMetrics[metricName] = MetricMetadata{
Fields: *fields,
Indices: indices,
}
}
}
}
}