in loadgen/cmd/otelbench/remote_stats.go [170:230]
func (esf elasticsearchStatsFetcher) FetchStats(ctx context.Context, from, to time.Time) (map[string]float64, error) {
var filters []types.Query
if esf.FilterClusterName != "" {
filters = append(filters, types.Query{
Term: map[string]types.TermQuery{
"labels.orchestrator_cluster_name": {
Value: esf.FilterClusterName,
},
},
})
}
if esf.FilterProjectID != "" {
filters = append(filters, types.Query{
Term: map[string]types.TermQuery{
"labels.project_id": {
Value: esf.FilterProjectID,
},
},
})
}
filters = append(filters, types.Query{
Range: map[string]types.RangeQuery{
"@timestamp": types.DateRangeQuery{
Gte: some.String(from.Format("2006-01-02T15:04:05.000Z")),
Lte: some.String(to.Format("2006-01-02T15:04:05.000Z")),
},
},
})
aggs := make(map[string]types.Aggregations, len(esf.Metrics))
for _, m := range esf.Metrics {
if agg, ok := knownMetricsAggregations[m]; ok {
for k, v := range agg.Aggregations(m) {
aggs[k] = v
}
}
}
request := search.Request{
Query: &types.Query{
Bool: &types.BoolQuery{
Filter: filters,
},
},
Aggregations: aggs,
Size: some.Int(0),
}
ctx, cancel := context.WithTimeout(ctx, esf.ElasticsearchTimeout)
defer cancel()
resp, err := esf.client.Search().Index(esf.ElasticsearchIndex).Request(&request).Do(ctx)
if err != nil {
return nil, err
}
res := make(map[string]float64, len(resp.Aggregations))
for _, m := range esf.Metrics {
if agg, ok := knownMetricsAggregations[m]; ok {
if v := agg.Value(m, resp.Aggregations); v != nil {
res[m] = float64(*v)
}
}
}
return res, nil
}