in oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/AggregationQueryEsDAO.java [56:139]
public List<SelectedRecord> sortMetrics(final TopNCondition condition,
final String valueColumnName,
final Duration duration,
final List<KeyValue> additionalConditions) {
final String realValueColumn = IndexController.LogicIndicesRegister.getPhysicalColumnName(condition.getName(), valueColumnName);
final RangeQueryBuilder basicQuery = Query.range(Metrics.TIME_BUCKET)
.lte(duration.getEndTimeBucket())
.gte(duration.getStartTimeBucket());
final BoolQueryBuilder boolQuery = Query.bool().must(basicQuery);
final SearchBuilder search = Search.builder();
final boolean asc = condition.getOrder().equals(Order.ASC);
List<AttrCondition> attributes = condition.getAttributes();
if (CollectionUtils.isNotEmpty(attributes)) {
attributes.forEach(attr -> {
if (attr.isEquals()) {
boolQuery.must(Query.term(attr.getKey(), attr.getValue()));
} else {
boolQuery.mustNot(Query.terms(attr.getKey(), attr.getValue()));
}
});
}
if (CollectionUtils.isEmpty(additionalConditions)
&& IndexController.LogicIndicesRegister.isMergedTable(condition.getName())) {
boolQuery.must(Query.term(
IndexController.LogicIndicesRegister.METRIC_TABLE_NAME,
condition.getName()
));
search.query(boolQuery);
} else if (CollectionUtils.isEmpty(additionalConditions)) {
search.query(boolQuery);
} else if (CollectionUtils.isNotEmpty(additionalConditions)
&& IndexController.LogicIndicesRegister.isMergedTable(condition.getName())) {
boolQuery.must(Query.term(
IndexController.LogicIndicesRegister.METRIC_TABLE_NAME,
condition.getName()
));
additionalConditions.forEach(additionalCondition -> boolQuery
.must(Query.terms(
additionalCondition.getKey(),
additionalCondition.getValue()
)));
search.query(boolQuery);
} else {
additionalConditions.forEach(additionalCondition -> boolQuery
.must(Query.terms(
additionalCondition.getKey(),
additionalCondition.getValue()
)));
search.query(boolQuery);
}
search.aggregation(
Aggregation.terms(Metrics.ENTITY_ID)
.field(Metrics.ENTITY_ID)
.order(BucketOrder.aggregation(realValueColumn, asc))
.size(condition.getTopN())
.subAggregation(Aggregation.avg(realValueColumn).field(realValueColumn))
.executionHint(TermsAggregationBuilder.ExecutionHint.MAP)
.collectMode(TermsAggregationBuilder.CollectMode.BREADTH_FIRST)
.build());
final SearchResponse response = searchDebuggable(new TimeRangeIndexNameGenerator(
IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName()),
duration.getStartTimeBucketInSec(),
duration.getEndTimeBucketInSec()), search.build());
final List<SelectedRecord> topNList = new ArrayList<>();
if (Objects.nonNull(response.getAggregations())) {
final Map<String, Object> idTerms =
(Map<String, Object>) response.getAggregations().get(Metrics.ENTITY_ID);
final List<Map<String, Object>> buckets =
(List<Map<String, Object>>) idTerms.get("buckets");
for (Map<String, Object> termsBucket : buckets) {
SelectedRecord record = new SelectedRecord();
record.setId((String) termsBucket.get("key"));
Map<String, Object> value = (Map<String, Object>) termsBucket.get(realValueColumn);
record.setValue(String.valueOf(((Number) value.get("value")).longValue()));
topNList.add(record);
}
}
return topNList;
}