private void onNewEntryResponse()

in ml-algorithms/src/main/java/org/opensearch/ml/engine/algorithms/anomalylocalization/AnomalyLocalizerImpl.java [214:244]


    private void onNewEntryResponse(SearchResponse response, AnomalyLocalizationInput input, AggregationBuilder agg,
                                    AnomalyLocalizationOutput.Result result,
                                    AnomalyLocalizationOutput.Bucket outputBucket, PriorityQueue<AnomalyLocalizationOutput.Entity> queue,
                                    AnomalyLocalizationOutput output,
                                    ActionListener<AnomalyLocalizationOutput> listener) {
        Optional<CompositeAggregation> respAgg =
                Optional.ofNullable(response.getAggregations()).map(aggs -> (CompositeAggregation) aggs.get(agg.getName()));
        for (CompositeAggregation.Bucket bucket : respAgg.map(a -> a.getBuckets()).orElse(Collections.emptyList())) {
            List<String> key = toStringKey(bucket.getKey(), input);
            AnomalyLocalizationOutput.Entity entity = new AnomalyLocalizationOutput.Entity();
            entity.setKey(key);
            entity.setNewValue(getDoubleValue((SingleValue) bucket.getAggregations().get(agg.getName())));
            entity.setBaseValue(outputBucket.getBase().get().getCounter().get().estimate(key));
            entity.setContributionValue(entity.getNewValue() - entity.getBaseValue());
            if (queue.size() < input.getNumOutputs()) {
                queue.add(entity);
            } else if (queue.comparator().compare(queue.peek(), entity) < 0) {
                queue.poll();
                queue.add(entity);
            }
        }
        Optional<Map<String, Object>> afterKey = respAgg.map(r -> r.afterKey());
        if (afterKey.isPresent()) {
            processNewEntry(input, agg, result, outputBucket, afterKey, queue, output, listener);
        } else {
            List<List<String>> keys = queue.stream().map(AnomalyLocalizationOutput.Entity::getKey).collect(Collectors.toList());
            SearchRequest request = newSearchRequestForEntityKeys(input, agg, outputBucket, keys);
            client.search(request, wrap(r -> onEntityKeysResponse(r, input, agg, result, outputBucket, queue, output, listener),
                    listener::onFailure));
        }
    }