in persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java [2164:2340]
private Map<String, Long> aggregateQuery(final Condition filter, final BaseAggregate aggregate, final String itemType,
final boolean optimizedQuery, int queryBucketSize) {
return new InClassLoaderExecute<Map<String, Long>>(metricsService, this.getClass().getName() + ".aggregateQuery", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {
@Override
protected Map<String, Long> execute(Object... args) throws IOException {
Map<String, Long> results = new LinkedHashMap<String, Long>();
SearchRequest searchRequest = new SearchRequest(getIndexNameForQuery(itemType));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.size(0);
MatchAllQueryBuilder matchAll = QueryBuilders.matchAllQuery();
boolean isItemTypeSharingIndex = isItemTypeSharingIndex(itemType);
searchSourceBuilder.query(isItemTypeSharingIndex ? getItemTypeQueryBuilder(itemType) : matchAll);
List<AggregationBuilder> lastAggregation = new ArrayList<AggregationBuilder>();
if (aggregate != null) {
AggregationBuilder bucketsAggregation = null;
String fieldName = aggregate.getField();
if (aggregate instanceof DateAggregate) {
DateAggregate dateAggregate = (DateAggregate) aggregate;
DateHistogramAggregationBuilder dateHistogramBuilder = AggregationBuilders.dateHistogram("buckets").field(fieldName).calendarInterval(new DateHistogramInterval((dateAggregate.getInterval())));
if (dateAggregate.getFormat() != null) {
dateHistogramBuilder.format(dateAggregate.getFormat());
}
bucketsAggregation = dateHistogramBuilder;
} else if (aggregate instanceof NumericRangeAggregate) {
RangeAggregationBuilder rangebuilder = AggregationBuilders.range("buckets").field(fieldName);
for (NumericRange range : ((NumericRangeAggregate) aggregate).getRanges()) {
if (range != null) {
if (range.getFrom() != null && range.getTo() != null) {
rangebuilder.addRange(range.getKey(), range.getFrom(), range.getTo());
} else if (range.getFrom() != null) {
rangebuilder.addUnboundedFrom(range.getKey(), range.getFrom());
} else if (range.getTo() != null) {
rangebuilder.addUnboundedTo(range.getKey(), range.getTo());
}
}
}
bucketsAggregation = rangebuilder;
} else if (aggregate instanceof DateRangeAggregate) {
DateRangeAggregate dateRangeAggregate = (DateRangeAggregate) aggregate;
DateRangeAggregationBuilder rangebuilder = AggregationBuilders.dateRange("buckets").field(fieldName);
if (dateRangeAggregate.getFormat() != null) {
rangebuilder.format(dateRangeAggregate.getFormat());
}
for (DateRange range : dateRangeAggregate.getDateRanges()) {
if (range != null) {
rangebuilder.addRange(range.getKey(), range.getFrom() != null ? range.getFrom().toString() : null, range.getTo() != null ? range.getTo().toString() : null);
}
}
bucketsAggregation = rangebuilder;
} else if (aggregate instanceof IpRangeAggregate) {
IpRangeAggregate ipRangeAggregate = (IpRangeAggregate) aggregate;
IpRangeAggregationBuilder rangebuilder = AggregationBuilders.ipRange("buckets").field(fieldName);
for (IpRange range : ipRangeAggregate.getRanges()) {
if (range != null) {
rangebuilder.addRange(range.getKey(), range.getFrom(), range.getTo());
}
}
bucketsAggregation = rangebuilder;
} else {
fieldName = getPropertyNameWithData(fieldName, itemType);
//default
if (fieldName != null) {
bucketsAggregation = AggregationBuilders.terms("buckets").field(fieldName).size(queryBucketSize);
if (aggregate instanceof TermsAggregate) {
TermsAggregate termsAggregate = (TermsAggregate) aggregate;
if (termsAggregate.getPartition() > -1 && termsAggregate.getNumPartitions() > -1) {
((TermsAggregationBuilder) bucketsAggregation).includeExclude(new IncludeExclude(termsAggregate.getPartition(), termsAggregate.getNumPartitions()));
}
}
} else {
// field name could be null if no existing data exists
}
}
if (bucketsAggregation != null) {
final MissingAggregationBuilder missingBucketsAggregation = AggregationBuilders.missing("missing").field(fieldName);
for (AggregationBuilder aggregationBuilder : lastAggregation) {
bucketsAggregation.subAggregation(aggregationBuilder);
missingBucketsAggregation.subAggregation(aggregationBuilder);
}
lastAggregation = Arrays.asList(bucketsAggregation, missingBucketsAggregation);
}
}
// If the request is optimized then we don't need a global aggregation which is very slow and we can put the query with a
// filter on range items in the query block so we don't retrieve all the document before filtering the whole
if (optimizedQuery) {
for (AggregationBuilder aggregationBuilder : lastAggregation) {
searchSourceBuilder.aggregation(aggregationBuilder);
}
if (filter != null) {
searchSourceBuilder.query(wrapWithItemTypeQuery(itemType, conditionESQueryBuilderDispatcher.buildFilter(filter)));
}
} else {
if (filter != null) {
AggregationBuilder filterAggregation = AggregationBuilders.filter("filter",
wrapWithItemTypeQuery(itemType, conditionESQueryBuilderDispatcher.buildFilter(filter)));
for (AggregationBuilder aggregationBuilder : lastAggregation) {
filterAggregation.subAggregation(aggregationBuilder);
}
lastAggregation = Collections.singletonList(filterAggregation);
}
AggregationBuilder globalAggregation = AggregationBuilders.global("global");
for (AggregationBuilder aggregationBuilder : lastAggregation) {
globalAggregation.subAggregation(aggregationBuilder);
}
searchSourceBuilder.aggregation(globalAggregation);
}
searchRequest.source(searchSourceBuilder);
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
if (aggQueryMaxResponseSizeHttp != null) {
builder.setHttpAsyncResponseConsumerFactory(
new HttpAsyncResponseConsumerFactory
.HeapBufferedResponseConsumerFactory(aggQueryMaxResponseSizeHttp));
}
SearchResponse response = client.search(searchRequest, builder.build());
Aggregations aggregations = response.getAggregations();
if (aggregations != null) {
if (optimizedQuery) {
if (response.getHits() != null) {
results.put("_filtered", response.getHits().getTotalHits().value);
}
} else {
Global globalAgg = aggregations.get("global");
results.put("_all", globalAgg.getDocCount());
aggregations = globalAgg.getAggregations();
if (aggregations.get("filter") != null) {
Filter filterAgg = aggregations.get("filter");
results.put("_filtered", filterAgg.getDocCount());
aggregations = filterAgg.getAggregations();
}
}
if (aggregations.get("buckets") != null) {
if (aggQueryThrowOnMissingDocs) {
if (aggregations.get("buckets") instanceof Terms) {
Terms terms = aggregations.get("buckets");
if (terms.getDocCountError() > 0 || terms.getSumOfOtherDocCounts() > 0) {
throw new UnsupportedOperationException("Some docs are missing in aggregation query. docCountError is:" +
terms.getDocCountError() + " sumOfOtherDocCounts:" + terms.getSumOfOtherDocCounts());
}
}
}
long totalDocCount = 0;
MultiBucketsAggregation terms = aggregations.get("buckets");
for (MultiBucketsAggregation.Bucket bucket : terms.getBuckets()) {
results.put(bucket.getKeyAsString(), bucket.getDocCount());
totalDocCount += bucket.getDocCount();
}
SingleBucketAggregation missing = aggregations.get("missing");
if (missing.getDocCount() > 0) {
results.put("_missing", missing.getDocCount());
totalDocCount += missing.getDocCount();
}
if (response.getHits() != null && TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO.equals(response.getHits().getTotalHits().relation)) {
results.put("_filtered", totalDocCount);
}
}
}
return results;
}
}.catchingExecuteInClassLoader(true);
}