func()

in loadgen/cmd/otelbench/remote_stats.go [170:230]


func (esf elasticsearchStatsFetcher) FetchStats(ctx context.Context, from, to time.Time) (map[string]float64, error) {
	var filters []types.Query
	if esf.FilterClusterName != "" {
		filters = append(filters, types.Query{
			Term: map[string]types.TermQuery{
				"labels.orchestrator_cluster_name": {
					Value: esf.FilterClusterName,
				},
			},
		})
	}
	if esf.FilterProjectID != "" {
		filters = append(filters, types.Query{
			Term: map[string]types.TermQuery{
				"labels.project_id": {
					Value: esf.FilterProjectID,
				},
			},
		})
	}
	filters = append(filters, types.Query{
		Range: map[string]types.RangeQuery{
			"@timestamp": types.DateRangeQuery{
				Gte: some.String(from.Format("2006-01-02T15:04:05.000Z")),
				Lte: some.String(to.Format("2006-01-02T15:04:05.000Z")),
			},
		},
	})
	aggs := make(map[string]types.Aggregations, len(esf.Metrics))
	for _, m := range esf.Metrics {
		if agg, ok := knownMetricsAggregations[m]; ok {
			for k, v := range agg.Aggregations(m) {
				aggs[k] = v
			}
		}
	}
	request := search.Request{
		Query: &types.Query{
			Bool: &types.BoolQuery{
				Filter: filters,
			},
		},
		Aggregations: aggs,
		Size:         some.Int(0),
	}
	ctx, cancel := context.WithTimeout(ctx, esf.ElasticsearchTimeout)
	defer cancel()
	resp, err := esf.client.Search().Index(esf.ElasticsearchIndex).Request(&request).Do(ctx)
	if err != nil {
		return nil, err
	}
	res := make(map[string]float64, len(resp.Aggregations))
	for _, m := range esf.Metrics {
		if agg, ok := knownMetricsAggregations[m]; ok {
			if v := agg.Value(m, resp.Aggregations); v != nil {
				res[m] = float64(*v)
			}
		}
	}
	return res, nil
}