in alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt [43:98]
suspend fun collectInputResults(
monitor: Monitor,
periodStart: Instant,
periodEnd: Instant,
prevResult: InputRunResults? = null
): InputRunResults {
return try {
val results = mutableListOf<Map<String, Any>>()
val aggTriggerAfterKey: MutableMap<String, TriggerAfterKey> = mutableMapOf()
// TODO: If/when multiple input queries are supported for Bucket-Level Monitor execution, aggTriggerAfterKeys will
// need to be updated to account for it
monitor.inputs.forEach { input ->
when (input) {
is SearchInput -> {
// TODO: Figure out a way to use SearchTemplateRequest without bringing in the entire TransportClient
val searchParams = mapOf(
"period_start" to periodStart.toEpochMilli(),
"period_end" to periodEnd.toEpochMilli()
)
// Deep copying query before passing it to rewriteQuery since otherwise, the monitor.input is modified directly
// which causes a strange bug where the rewritten query persists on the Monitor across executions
val rewrittenQuery = AggregationQueryRewriter.rewriteQuery(deepCopyQuery(input.query), prevResult, monitor.triggers)
val searchSource = scriptService.compile(
Script(
ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG,
rewrittenQuery.toString(), searchParams
),
TemplateScript.CONTEXT
)
.newInstance(searchParams)
.execute()
val searchRequest = SearchRequest().indices(*input.indices.toTypedArray())
XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use {
searchRequest.source(SearchSourceBuilder.fromXContent(it))
}
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
aggTriggerAfterKey += AggregationQueryRewriter.getAfterKeysFromSearchResponse(
searchResponse,
monitor.triggers,
prevResult?.aggTriggersAfterKey
)
results += searchResponse.convertToMap()
}
else -> {
throw IllegalArgumentException("Unsupported input type: ${input.name()}.")
}
}
}
InputRunResults(results.toList(), aggTriggersAfterKey = aggTriggerAfterKey)
} catch (e: Exception) {
logger.info("Error collecting inputs for monitor: ${monitor.id}", e)
InputRunResults(emptyList(), e)
}
}