in alerting/src/main/kotlin/org/opensearch/alerting/aggregation/bucketselectorext/BucketSelectorExtAggregator.kt [88:141]
override fun doReduce(aggregations: Aggregations, reduceContext: ReduceContext): InternalAggregation {
val parentBucketPathList = AggregationPath.parse(parentBucketPath).pathElementsAsStringList
var subAggregations: Aggregations = aggregations
for (i in 0 until parentBucketPathList.size - 1) {
subAggregations = subAggregations.get<SingleBucketAggregation>(parentBucketPathList[0]).aggregations
}
val originalAgg = subAggregations.get(parentBucketPathList.last()) as InternalMultiBucketAggregation<*, *>
val buckets = originalAgg.buckets
val factory = reduceContext.scriptService().compile(script, BucketAggregationSelectorScript.CONTEXT)
val selectedBucketsIndex: MutableList<Int> = ArrayList()
for (i in buckets.indices) {
val bucket = buckets[i]
if (bucketSelectorExtFilter != null) {
var accepted = true
if (bucketSelectorExtFilter!!.isCompositeAggregation) {
val compBucketKeyObj = (bucket as InternalComposite.InternalBucket).key
val filtersMap: HashMap<String, IncludeExclude>? = bucketSelectorExtFilter!!.filtersMap
for (sourceKey in compBucketKeyObj.keys) {
if (filtersMap != null) {
if (filtersMap.containsKey(sourceKey)) {
val obj = compBucketKeyObj[sourceKey]
accepted = isAccepted(obj!!, filtersMap[sourceKey])
if (!accepted) break
} else {
accepted = false
break
}
}
}
} else {
accepted = isAccepted(bucket.key, bucketSelectorExtFilter!!.filters)
}
if (!accepted) continue
}
val vars: MutableMap<String, Any> = HashMap()
if (script.params != null) {
vars.putAll(script.params)
}
for ((varName, bucketsPath) in bucketsPathsMap) {
val value = BucketHelpers.resolveBucketValue(originalAgg, bucket, bucketsPath, gapPolicy)
vars[varName] = value
}
val executableScript = factory.newInstance(vars)
// TODO: can we use one instance of the script for all buckets? it should be stateless?
if (executableScript.execute()) {
selectedBucketsIndex.add(i)
}
}
return BucketSelectorIndices(
name(), parentBucketPath, selectedBucketsIndex, originalAgg.metadata
)
}