in alerting/src/main/kotlin/org/opensearch/alerting/util/AggregationQueryRewriter.kt [28:64]
fun rewriteQuery(query: SearchSourceBuilder, prevResult: InputRunResults?, triggers: List<Trigger>): SearchSourceBuilder {
triggers.forEach { trigger ->
if (trigger is BucketLevelTrigger) {
// add bucket selector pipeline aggregation for each trigger in query
query.aggregation(trigger.bucketSelector)
// if this request is processing the subsequent pages of input query result, then add after key
if (prevResult?.aggTriggersAfterKey?.get(trigger.id) != null) {
val parentBucketPath = AggregationPath.parse(trigger.bucketSelector.parentBucketPath)
var aggBuilders = (query.aggregations() as AggregatorFactories.Builder).aggregatorFactories
var factory: AggregationBuilder? = null
for (i in 0 until parentBucketPath.pathElements.size) {
factory = null
for (aggFactory in aggBuilders) {
if (aggFactory.name.equals(parentBucketPath.pathElements[i].name)) {
aggBuilders = aggFactory.subAggregations
factory = aggFactory
break
}
}
if (factory == null) {
throw IllegalArgumentException("ParentBucketPath: $parentBucketPath not found in input query results")
}
}
if (factory is CompositeAggregationBuilder) {
// if the afterKey from previous result is null, what does it signify?
// A) result set exhausted OR B) first page ?
val afterKey = prevResult.aggTriggersAfterKey[trigger.id]!!.afterKey
factory.aggregateAfter(afterKey)
} else {
throw IllegalStateException("AfterKeys are not expected to be present in non CompositeAggregationBuilder")
}
}
}
}
return query
}