in alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt [101:167]
fun composeQueryLevelAlert(
ctx: QueryLevelTriggerExecutionContext,
result: QueryLevelTriggerRunResult,
alertError: AlertError?
): Alert? {
val currentTime = Instant.now()
val currentAlert = ctx.alert
val updatedActionExecutionResults = mutableListOf<ActionExecutionResult>()
val currentActionIds = mutableSetOf<String>()
if (currentAlert != null) {
// update current alert's action execution results
for (actionExecutionResult in currentAlert.actionExecutionResults) {
val actionId = actionExecutionResult.actionId
currentActionIds.add(actionId)
val actionRunResult = result.actionResults[actionId]
when {
actionRunResult == null -> updatedActionExecutionResults.add(actionExecutionResult)
actionRunResult.throttled ->
updatedActionExecutionResults.add(
actionExecutionResult.copy(
throttledCount = actionExecutionResult.throttledCount + 1
)
)
else -> updatedActionExecutionResults.add(actionExecutionResult.copy(lastExecutionTime = actionRunResult.executionTime))
}
}
// add action execution results which not exist in current alert
updatedActionExecutionResults.addAll(
result.actionResults.filter { !currentActionIds.contains(it.key) }
.map { ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) }
)
} else {
updatedActionExecutionResults.addAll(
result.actionResults.map {
ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0)
}
)
}
// Merge the alert's error message to the current alert's history
val updatedHistory = currentAlert?.errorHistory.update(alertError)
return if (alertError == null && !result.triggered) {
currentAlert?.copy(
state = Alert.State.COMPLETED, endTime = currentTime, errorMessage = null,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
)
} else if (alertError == null && currentAlert?.isAcknowledged() == true) {
null
} else if (currentAlert != null) {
val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
currentAlert.copy(
state = alertState, lastNotificationTime = currentTime, errorMessage = alertError?.message,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
)
} else {
val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
Alert(
monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
)
}
}