in pkg/client/elasticsearch/query.go [80:208]
func getMetricForPod(
ctx *context.Context,
esClient *esv8.Client,
metadata MetricMetadata,
name types.NamespacedName,
info provider.CustomMetricInfo,
metricSelector labels.Selector,
originalSelector labels.Selector,
objects []string,
) (timestampedMetric, error) {
defer tracing.Span(ctx)()
var query string
if metadata.Search != nil {
// User specified a custom query
podSelectors := make(map[string]string)
requirements, _ := originalSelector.Requirements()
for _, requirement := range requirements {
values := requirement.Values()
if len(values) == 0 {
continue
}
// Get first item in the selector
for selectorValue := range values {
podSelectors[requirement.Key()] = selectorValue
}
}
tplBuffer := bytes.Buffer{}
if err := metadata.Search.Template.Execute(&tplBuffer, customQueryParams{
Metric: info.Metric,
Pod: name.Name,
PodSelectors: podSelectors,
Namespace: name.Namespace,
Objects: objects,
Env: Env,
}); err != nil {
return timestampedMetric{}, err
}
query = tplBuffer.String()
} else {
query = queryFor(QueryParams{
Metric: info.Metric,
Name: name,
})
}
res, err := search(ctx, esClient, metadata, query)
if err != nil {
return timestampedMetric{}, err
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, err := io.ReadAll(res.Body)
if err != nil {
return timestampedMetric{}, fmt.Errorf("[%s] failed to read search response body: %w", res.Status(), err)
}
var errorResponse estypes.ElasticsearchError
if err := json.Unmarshal(bodyBytes, &errorResponse); err != nil {
return timestampedMetric{}, fmt.Errorf("[%s] failed to unmarshal search response '%s' with error %w", res.Status(), string(bodyBytes), err)
}
return timestampedMetric{}, errorResponse
}
var r map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
return timestampedMetric{}, fmt.Errorf("error parsing the response body: %s", err)
}
var value float64
var timestamp metav1.Time
if metadata.Search != nil {
iter := metadata.Search.MetricResultQuery.Run(r)
for {
v, ok := iter.Next()
if !ok {
break
}
if err, ok := v.(error); ok {
return timestampedMetric{}, err
}
if value, err = getFloat(v); err != nil {
return timestampedMetric{}, err
}
}
iter = metadata.Search.TimestampResultQuery.Run(r)
for {
v, ok := iter.Next()
if !ok {
break
}
if err, ok := v.(error); ok {
return timestampedMetric{}, err
}
if timestamp, err = getTimestamp(v); err != nil {
return timestampedMetric{}, err
}
}
} else {
// Get the result from the document.
metricDocument, err := getMetricDocument(info, name, metricSelector, r)
if err != nil {
return timestampedMetric{}, err
}
if value, err = getMetricValue(ctx, "_source."+info.Metric, metricDocument); err != nil {
return timestampedMetric{}, err
}
if timestamp, err = getTimestampFromDocument(ctx, "_source.@timestamp", metricDocument); err != nil {
return timestampedMetric{}, err
}
}
var q *resource.Quantity
if math.IsNaN(value) {
q = resource.NewQuantity(0, resource.DecimalSI)
} else {
q = resource.NewMilliQuantity(int64(value*1000.0), resource.DecimalSI)
}
return timestampedMetric{
Value: *q,
Timestamp: timestamp,
}, nil
}