loadgen/cmd/otelbench/remote_stats.go (184 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package main import ( "context" "errors" "time" "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/typedapi/core/search" "github.com/elastic/go-elasticsearch/v8/typedapi/some" "github.com/elastic/go-elasticsearch/v8/typedapi/types" ) // statsAggregation is a local abstraction to aggregate // a requested remote metric into benchmarking stats in Elasticsearch. type statsAggregation interface { Aggregations(string) map[string]types.Aggregations Value(string, map[string]types.Aggregate) *float64 } // avg implements average aggregation directly via Elasticsearch Avg metric aggregation. // OTel Gauge metrics should use this stats aggregation. type avg struct{} func (avg) Aggregations(metric string) map[string]types.Aggregations { return map[string]types.Aggregations{ metric + "_avg": {Avg: &types.AverageAggregation{Field: some.String(metric)}}, } } func (avg) Value(metric string, aggs map[string]types.Aggregate) *float64 { if a, ok := aggs[metric+"_avg"].(*types.AvgAggregate); ok { return (*float64)(a.Value) } return nil } // minMax implements sum aggregation by utilizing Elasticsearch Max-Min metric aggregations. // By calculating the difference between the aggregated max and min values, // cumulative metrics are effectively converted to delta metrics to obtain the counter sum. // OTel Counter metrics that have cumulative temporality should use this stats aggregation // // TODO this won't work when an additional aggregation collector dimension will be added in the future. // minMax will need to be updated to have a nested aggregation applied per collector instance. type minMax struct{} func (minMax) Aggregations(metric string) map[string]types.Aggregations { return map[string]types.Aggregations{ metric + "_min": {Min: &types.MinAggregation{Field: some.String(metric)}}, metric + "_max": {Max: &types.MaxAggregation{Field: some.String(metric)}}, } } func (minMax) Value(metric string, aggs map[string]types.Aggregate) *float64 { min, minOk := aggs[metric+"_min"].(*types.MinAggregate) max, maxOk := aggs[metric+"_max"].(*types.MaxAggregate) if minOk && maxOk && min.Value != nil && max.Value != nil { v := float64(*max.Value) - float64(*min.Value) if v == 0 { return (*float64)(max.Value) } return &v } return nil } // knownMetricsAggregations corresponds to know basic-level metrics from // https://opentelemetry.io/docs/collector/internal-telemetry/#basic-level-metrics. var knownMetricsAggregations map[string]statsAggregation = map[string]statsAggregation{ "otelcol_exporter_enqueue_failed_log_records": minMax{}, "otelcol_exporter_enqueue_failed_metric_points": minMax{}, "otelcol_exporter_enqueue_failed_spans": minMax{}, "otelcol_exporter_queue_capacity": avg{}, "otelcol_exporter_queue_size": avg{}, "otelcol_exporter_send_failed_log_records": minMax{}, "otelcol_exporter_send_failed_metric_points": minMax{}, "otelcol_exporter_send_failed_spans": minMax{}, "otelcol_exporter_sent_log_records": minMax{}, "otelcol_exporter_sent_metric_points": minMax{}, "otelcol_exporter_sent_spans": minMax{}, "otelcol_process_cpu_seconds": minMax{}, "otelcol_process_memory_rss": avg{}, "otelcol_process_runtime_heap_alloc_bytes": avg{}, "otelcol_process_runtime_total_alloc_bytes": minMax{}, "otelcol_process_runtime_total_sys_memory_bytes": avg{}, "otelcol_process_uptime": minMax{}, "otelcol_processor_batch_batch_send_size": minMax{}, "otelcol_processor_batch_batch_size_trigger_send": minMax{}, "otelcol_processor_batch_metadata_cardinality": minMax{}, "otelcol_processor_batch_timeout_trigger_send": minMax{}, "otelcol_processor_incoming_items": minMax{}, "otelcol_processor_outgoing_items": minMax{}, "otelcol_receiver_accepted_log_records": minMax{}, "otelcol_receiver_accepted_metric_points": minMax{}, "otelcol_receiver_accepted_spans": minMax{}, "otelcol_receiver_refused_log_records": minMax{}, "otelcol_receiver_refused_metric_points": minMax{}, "otelcol_receiver_refused_spans": minMax{}, "otelcol_scraper_errored_metric_points": minMax{}, "otelcol_scraper_scraped_metric_points": minMax{}, } type elasticsearchTelemetryConfig TelemetryConfig func (cfg elasticsearchTelemetryConfig) validate() error { if len(cfg.ElasticsearchURL) == 0 { return errors.New("remote stats will be ignored because of empty telemetry Elasticsearch URL") } if cfg.ElasticsearchAPIKey == "" && (cfg.ElasticsearchUserName == "" || cfg.ElasticsearchPassword == "") { return errors.New("remote stats will be ignored because of empty telemetry Elasticsearch API key and username or password") } if cfg.ElasticsearchIndex == "" { return errors.New("remote stats will be ignored because of empty telemetry Elasticsearch search index pattern") } if len(cfg.Metrics) == 0 { return errors.New("remote stats will be ignored because of empty telemetry metrics list") } return nil } type remoteStatsFetcher interface { FetchStats(ctx context.Context, from, to time.Time) (map[string]float64, error) } type elasticsearchStatsFetcher struct { elasticsearchTelemetryConfig client *elasticsearch.TypedClient } func newElasticsearchStatsFetcher(cfg elasticsearchTelemetryConfig) (remoteStatsFetcher, bool, error) { if err := cfg.validate(); err != nil { return nil, true, err } client, err := elasticsearch.NewTypedClient(elasticsearch.Config{ Addresses: cfg.ElasticsearchURL, Username: cfg.ElasticsearchUserName, Password: cfg.ElasticsearchPassword, APIKey: cfg.ElasticsearchAPIKey, DisableRetry: true, }) if err != nil { return nil, false, err } ctx, cancel := context.WithTimeout(context.Background(), cfg.ElasticsearchTimeout) defer cancel() ok, err := client.Ping().Do(ctx) if err != nil || !ok { return nil, false, err } return elasticsearchStatsFetcher{elasticsearchTelemetryConfig: cfg, client: client}, true, nil } func (esf elasticsearchStatsFetcher) FetchStats(ctx context.Context, from, to time.Time) (map[string]float64, error) { var filters []types.Query if esf.FilterClusterName != "" { filters = append(filters, types.Query{ Term: map[string]types.TermQuery{ "labels.orchestrator_cluster_name": { Value: esf.FilterClusterName, }, }, }) } if esf.FilterProjectID != "" { filters = append(filters, types.Query{ Term: map[string]types.TermQuery{ "labels.project_id": { Value: esf.FilterProjectID, }, }, }) } filters = append(filters, types.Query{ Range: map[string]types.RangeQuery{ "@timestamp": types.DateRangeQuery{ Gte: some.String(from.Format("2006-01-02T15:04:05.000Z")), Lte: some.String(to.Format("2006-01-02T15:04:05.000Z")), }, }, }) aggs := make(map[string]types.Aggregations, len(esf.Metrics)) for _, m := range esf.Metrics { if agg, ok := knownMetricsAggregations[m]; ok { for k, v := range agg.Aggregations(m) { aggs[k] = v } } } request := search.Request{ Query: &types.Query{ Bool: &types.BoolQuery{ Filter: filters, }, }, Aggregations: aggs, Size: some.Int(0), } ctx, cancel := context.WithTimeout(ctx, esf.ElasticsearchTimeout) defer cancel() resp, err := esf.client.Search().Index(esf.ElasticsearchIndex).Request(&request).Do(ctx) if err != nil { return nil, err } res := make(map[string]float64, len(resp.Aggregations)) for _, m := range esf.Metrics { if agg, ok := knownMetricsAggregations[m]; ok { if v := agg.Value(m, resp.Aggregations); v != nil { res[m] = float64(*v) } } } return res, nil }