func getMetricForPod()

in pkg/client/elasticsearch/query.go [80:208]


func getMetricForPod(
	ctx *context.Context,
	esClient *esv8.Client,
	metadata MetricMetadata,
	name types.NamespacedName,
	info provider.CustomMetricInfo,
	metricSelector labels.Selector,
	originalSelector labels.Selector,
	objects []string,
) (timestampedMetric, error) {
	defer tracing.Span(ctx)()
	var query string
	if metadata.Search != nil {
		// User specified a custom query
		podSelectors := make(map[string]string)
		requirements, _ := originalSelector.Requirements()
		for _, requirement := range requirements {
			values := requirement.Values()
			if len(values) == 0 {
				continue
			}
			// Get first item in the selector
			for selectorValue := range values {
				podSelectors[requirement.Key()] = selectorValue
			}
		}

		tplBuffer := bytes.Buffer{}

		if err := metadata.Search.Template.Execute(&tplBuffer, customQueryParams{
			Metric:       info.Metric,
			Pod:          name.Name,
			PodSelectors: podSelectors,
			Namespace:    name.Namespace,
			Objects:      objects,
			Env:          Env,
		}); err != nil {
			return timestampedMetric{}, err
		}

		query = tplBuffer.String()
	} else {
		query = queryFor(QueryParams{
			Metric: info.Metric,
			Name:   name,
		})
	}

	res, err := search(ctx, esClient, metadata, query)
	if err != nil {
		return timestampedMetric{}, err
	}
	defer res.Body.Close()

	if res.IsError() {
		bodyBytes, err := io.ReadAll(res.Body)
		if err != nil {
			return timestampedMetric{}, fmt.Errorf("[%s] failed to read search response body: %w", res.Status(), err)
		}
		var errorResponse estypes.ElasticsearchError
		if err := json.Unmarshal(bodyBytes, &errorResponse); err != nil {
			return timestampedMetric{}, fmt.Errorf("[%s] failed to unmarshal search response '%s' with error %w", res.Status(), string(bodyBytes), err)
		}
		return timestampedMetric{}, errorResponse
	}

	var r map[string]interface{}
	if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
		return timestampedMetric{}, fmt.Errorf("error parsing the response body: %s", err)
	}

	var value float64
	var timestamp metav1.Time

	if metadata.Search != nil {
		iter := metadata.Search.MetricResultQuery.Run(r)
		for {
			v, ok := iter.Next()
			if !ok {
				break
			}
			if err, ok := v.(error); ok {
				return timestampedMetric{}, err
			}
			if value, err = getFloat(v); err != nil {
				return timestampedMetric{}, err
			}
		}
		iter = metadata.Search.TimestampResultQuery.Run(r)
		for {
			v, ok := iter.Next()
			if !ok {
				break
			}
			if err, ok := v.(error); ok {
				return timestampedMetric{}, err
			}
			if timestamp, err = getTimestamp(v); err != nil {
				return timestampedMetric{}, err
			}
		}
	} else {
		// Get the result from the document.
		metricDocument, err := getMetricDocument(info, name, metricSelector, r)
		if err != nil {
			return timestampedMetric{}, err
		}

		if value, err = getMetricValue(ctx, "_source."+info.Metric, metricDocument); err != nil {
			return timestampedMetric{}, err
		}

		if timestamp, err = getTimestampFromDocument(ctx, "_source.@timestamp", metricDocument); err != nil {
			return timestampedMetric{}, err
		}
	}

	var q *resource.Quantity
	if math.IsNaN(value) {
		q = resource.NewQuantity(0, resource.DecimalSI)
	} else {
		q = resource.NewMilliQuantity(int64(value*1000.0), resource.DecimalSI)
	}

	return timestampedMetric{
		Value:     *q,
		Timestamp: timestamp,
	}, nil
}