private Map aggregateQuery()

in persistence-elasticsearch/core/src/main/java/org/apache/unomi/persistence/elasticsearch/ElasticSearchPersistenceServiceImpl.java [2164:2340]


    private Map<String, Long> aggregateQuery(final Condition filter, final BaseAggregate aggregate, final String itemType,
                                             final boolean optimizedQuery, int queryBucketSize) {
        return new InClassLoaderExecute<Map<String, Long>>(metricsService, this.getClass().getName() + ".aggregateQuery", this.bundleContext, this.fatalIllegalStateErrors, throwExceptions) {

            @Override
            protected Map<String, Long> execute(Object... args) throws IOException {
                Map<String, Long> results = new LinkedHashMap<String, Long>();

                SearchRequest searchRequest = new SearchRequest(getIndexNameForQuery(itemType));
                SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
                searchSourceBuilder.size(0);
                MatchAllQueryBuilder matchAll = QueryBuilders.matchAllQuery();
                boolean isItemTypeSharingIndex = isItemTypeSharingIndex(itemType);
                searchSourceBuilder.query(isItemTypeSharingIndex ? getItemTypeQueryBuilder(itemType) : matchAll);
                List<AggregationBuilder> lastAggregation = new ArrayList<AggregationBuilder>();

                if (aggregate != null) {
                    AggregationBuilder bucketsAggregation = null;
                    String fieldName = aggregate.getField();
                    if (aggregate instanceof DateAggregate) {
                        DateAggregate dateAggregate = (DateAggregate) aggregate;
                        DateHistogramAggregationBuilder dateHistogramBuilder = AggregationBuilders.dateHistogram("buckets").field(fieldName).calendarInterval(new DateHistogramInterval((dateAggregate.getInterval())));
                        if (dateAggregate.getFormat() != null) {
                            dateHistogramBuilder.format(dateAggregate.getFormat());
                        }
                        bucketsAggregation = dateHistogramBuilder;
                    } else if (aggregate instanceof NumericRangeAggregate) {
                        RangeAggregationBuilder rangebuilder = AggregationBuilders.range("buckets").field(fieldName);
                        for (NumericRange range : ((NumericRangeAggregate) aggregate).getRanges()) {
                            if (range != null) {
                                if (range.getFrom() != null && range.getTo() != null) {
                                    rangebuilder.addRange(range.getKey(), range.getFrom(), range.getTo());
                                } else if (range.getFrom() != null) {
                                    rangebuilder.addUnboundedFrom(range.getKey(), range.getFrom());
                                } else if (range.getTo() != null) {
                                    rangebuilder.addUnboundedTo(range.getKey(), range.getTo());
                                }
                            }
                        }
                        bucketsAggregation = rangebuilder;
                    } else if (aggregate instanceof DateRangeAggregate) {
                        DateRangeAggregate dateRangeAggregate = (DateRangeAggregate) aggregate;
                        DateRangeAggregationBuilder rangebuilder = AggregationBuilders.dateRange("buckets").field(fieldName);
                        if (dateRangeAggregate.getFormat() != null) {
                            rangebuilder.format(dateRangeAggregate.getFormat());
                        }
                        for (DateRange range : dateRangeAggregate.getDateRanges()) {
                            if (range != null) {
                                rangebuilder.addRange(range.getKey(), range.getFrom() != null ? range.getFrom().toString() : null, range.getTo() != null ? range.getTo().toString() : null);
                            }
                        }
                        bucketsAggregation = rangebuilder;
                    } else if (aggregate instanceof IpRangeAggregate) {
                        IpRangeAggregate ipRangeAggregate = (IpRangeAggregate) aggregate;
                        IpRangeAggregationBuilder rangebuilder = AggregationBuilders.ipRange("buckets").field(fieldName);
                        for (IpRange range : ipRangeAggregate.getRanges()) {
                            if (range != null) {
                                rangebuilder.addRange(range.getKey(), range.getFrom(), range.getTo());
                            }
                        }
                        bucketsAggregation = rangebuilder;
                    } else {
                        fieldName = getPropertyNameWithData(fieldName, itemType);
                        //default
                        if (fieldName != null) {
                            bucketsAggregation = AggregationBuilders.terms("buckets").field(fieldName).size(queryBucketSize);
                            if (aggregate instanceof TermsAggregate) {
                                TermsAggregate termsAggregate = (TermsAggregate) aggregate;
                                if (termsAggregate.getPartition() > -1 && termsAggregate.getNumPartitions() > -1) {
                                    ((TermsAggregationBuilder) bucketsAggregation).includeExclude(new IncludeExclude(termsAggregate.getPartition(), termsAggregate.getNumPartitions()));
                                }
                            }
                        } else {
                            // field name could be null if no existing data exists
                        }
                    }
                    if (bucketsAggregation != null) {
                        final MissingAggregationBuilder missingBucketsAggregation = AggregationBuilders.missing("missing").field(fieldName);
                        for (AggregationBuilder aggregationBuilder : lastAggregation) {
                            bucketsAggregation.subAggregation(aggregationBuilder);
                            missingBucketsAggregation.subAggregation(aggregationBuilder);
                        }
                        lastAggregation = Arrays.asList(bucketsAggregation, missingBucketsAggregation);
                    }
                }

                // If the request is optimized then we don't need a global aggregation which is very slow and we can put the query with a
                // filter on range items in the query block so we don't retrieve all the document before filtering the whole
                if (optimizedQuery) {
                    for (AggregationBuilder aggregationBuilder : lastAggregation) {
                        searchSourceBuilder.aggregation(aggregationBuilder);
                    }

                    if (filter != null) {
                        searchSourceBuilder.query(wrapWithItemTypeQuery(itemType, conditionESQueryBuilderDispatcher.buildFilter(filter)));
                    }
                } else {
                    if (filter != null) {
                        AggregationBuilder filterAggregation = AggregationBuilders.filter("filter",
                                wrapWithItemTypeQuery(itemType, conditionESQueryBuilderDispatcher.buildFilter(filter)));
                        for (AggregationBuilder aggregationBuilder : lastAggregation) {
                            filterAggregation.subAggregation(aggregationBuilder);
                        }
                        lastAggregation = Collections.singletonList(filterAggregation);
                    }

                    AggregationBuilder globalAggregation = AggregationBuilders.global("global");
                    for (AggregationBuilder aggregationBuilder : lastAggregation) {
                        globalAggregation.subAggregation(aggregationBuilder);
                    }

                    searchSourceBuilder.aggregation(globalAggregation);
                }

                searchRequest.source(searchSourceBuilder);

                RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();

                if (aggQueryMaxResponseSizeHttp != null) {
                    builder.setHttpAsyncResponseConsumerFactory(
                            new HttpAsyncResponseConsumerFactory
                                    .HeapBufferedResponseConsumerFactory(aggQueryMaxResponseSizeHttp));
                }

                SearchResponse response = client.search(searchRequest, builder.build());
                Aggregations aggregations = response.getAggregations();


                if (aggregations != null) {

                    if (optimizedQuery) {
                        if (response.getHits() != null) {
                            results.put("_filtered", response.getHits().getTotalHits().value);
                        }
                    } else {
                        Global globalAgg = aggregations.get("global");
                        results.put("_all", globalAgg.getDocCount());
                        aggregations = globalAgg.getAggregations();

                        if (aggregations.get("filter") != null) {
                            Filter filterAgg = aggregations.get("filter");
                            results.put("_filtered", filterAgg.getDocCount());
                            aggregations = filterAgg.getAggregations();
                        }
                    }
                    if (aggregations.get("buckets") != null) {

                        if (aggQueryThrowOnMissingDocs) {
                            if (aggregations.get("buckets") instanceof Terms) {
                                Terms terms = aggregations.get("buckets");
                                if (terms.getDocCountError() > 0 || terms.getSumOfOtherDocCounts() > 0) {
                                    throw new UnsupportedOperationException("Some docs are missing in aggregation query. docCountError is:" +
                                            terms.getDocCountError() + " sumOfOtherDocCounts:" + terms.getSumOfOtherDocCounts());
                                }
                            }
                        }

                        long totalDocCount = 0;
                        MultiBucketsAggregation terms = aggregations.get("buckets");
                        for (MultiBucketsAggregation.Bucket bucket : terms.getBuckets()) {
                            results.put(bucket.getKeyAsString(), bucket.getDocCount());
                            totalDocCount += bucket.getDocCount();
                        }
                        SingleBucketAggregation missing = aggregations.get("missing");
                        if (missing.getDocCount() > 0) {
                            results.put("_missing", missing.getDocCount());
                            totalDocCount += missing.getDocCount();
                        }
                        if (response.getHits() != null && TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO.equals(response.getHits().getTotalHits().relation)) {
                            results.put("_filtered", totalDocCount);
                        }
                    }
                }
                return results;
            }
        }.catchingExecuteInClassLoader(true);
    }