suspend fun runBucketLevelMonitor()

in alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt [315:555]


    suspend fun runBucketLevelMonitor(
        monitor: Monitor,
        periodStart: Instant,
        periodEnd: Instant,
        dryrun: Boolean = false
    ): MonitorRunResult<BucketLevelTriggerRunResult> {
        val roles = getRolesForMonitor(monitor)
        logger.debug("Running monitor: ${monitor.name} with roles: $roles Thread: ${Thread.currentThread().name}")

        if (periodStart == periodEnd) {
            logger.warn("Start and end time are the same: $periodStart. This monitor will probably only run once.")
        }

        var monitorResult = MonitorRunResult<BucketLevelTriggerRunResult>(monitor.name, periodStart, periodEnd)
        val currentAlerts = try {
            alertIndices.createOrUpdateAlertIndex()
            alertIndices.createOrUpdateInitialHistoryIndex()
            alertService.loadCurrentAlertsForBucketLevelMonitor(monitor)
        } catch (e: Exception) {
            // We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts
            val id = if (monitor.id.trim().isEmpty()) "_na_" else monitor.id
            logger.error("Error loading alerts for monitor: $id", e)
            return monitorResult.copy(error = e)
        }

        /*
         * Since the aggregation query can consist of multiple pages, each iteration of the do-while loop only has partial results
         * from the runBucketLevelTrigger results whereas the currentAlerts has a complete view of existing Alerts. This means that
         * it can be confirmed if an Alert is new or de-duped local to the do-while loop if a key appears or doesn't appear in
         * the currentAlerts. However, it cannot be guaranteed that an existing Alert is COMPLETED until all pages have been
         * iterated over (since a bucket that did not appear in one page of the aggregation results, could appear in a later page).
         *
         * To solve for this, the currentAlerts will be acting as a list of "potentially completed alerts" throughout the execution.
         * When categorizing the Alerts in each iteration, de-duped Alerts will be removed from the currentAlerts map
         * (for the Trigger being executed) and the Alerts left in currentAlerts after all pages have been iterated through can
         * be marked as COMPLETED since they were never de-duped.
         *
         * Meanwhile, the nextAlerts map will contain Alerts that will exist at the end of this Monitor execution. It is a compilation
         * across Triggers because in the case of executing actions at a PER_EXECUTION frequency, all the Alerts are needed before executing
         * Actions which can only be done once all of the aggregation results (and Triggers given the pagination logic) have been evaluated.
         */
        val triggerResults = mutableMapOf<String, BucketLevelTriggerRunResult>()
        val triggerContexts = mutableMapOf<String, BucketLevelTriggerExecutionContext>()
        val nextAlerts = mutableMapOf<String, MutableMap<AlertCategory, MutableList<Alert>>>()
        var firstIteration = true
        var firstPageOfInputResults = InputRunResults(listOf(), null)
        do {
            // TODO: Since a composite aggregation is being used for the input query, the total bucket count cannot be determined.
            //  If a setting is imposed that limits buckets that can be processed for Bucket-Level Monitors, we'd need to iterate over
            //  the buckets until we hit that threshold. In that case, we'd want to exit the execution without creating any alerts since the
            //  buckets we iterate over before hitting the limit is not deterministic. Is there a better way to fail faster in this case?
            withClosableContext(InjectorContextElement(monitor.id, settings, threadPool.threadContext, roles)) {
                // Storing the first page of results in the case of pagination input results to prevent empty results
                // in the final output of monitorResult which occurs when all pages have been exhausted.
                // If it's favorable to return the last page, will need to check how to accomplish that with multiple aggregation paths
                // with different page counts.
                val inputResults = inputService.collectInputResults(monitor, periodStart, periodEnd, monitorResult.inputResults)
                if (firstIteration) {
                    firstPageOfInputResults = inputResults
                    firstIteration = false
                }
                monitorResult = monitorResult.copy(inputResults = inputResults)
            }

            for (trigger in monitor.triggers) {
                // The currentAlerts map is formed by iterating over the Monitor's Triggers as keys so null should not be returned here
                val currentAlertsForTrigger = currentAlerts[trigger]!!
                val triggerCtx = BucketLevelTriggerExecutionContext(monitor, trigger as BucketLevelTrigger, monitorResult)
                triggerContexts[trigger.id] = triggerCtx
                val triggerResult = triggerService.runBucketLevelTrigger(monitor, trigger, triggerCtx)
                triggerResults[trigger.id] = triggerResult.getCombinedTriggerRunResult(triggerResults[trigger.id])

                /*
                 * If an error was encountered when running the trigger, it means that something went wrong when parsing the input results
                 * for the filtered buckets returned from the pipeline bucket selector injected into the input query.
                 *
                 * In this case, the returned aggregation result buckets are empty so the categorization of the Alerts that happens below
                 * should be skipped/invalidated since comparing the current Alerts to an empty result will lead the execution to believe
                 * that all Alerts have been COMPLETED. Not doing so would mean it would not be possible to propagate the error into the
                 * existing Alerts in a way the user can easily view them since they will have all been moved to the history index.
                 */
                if (triggerResults[trigger.id]?.error != null) continue

                // TODO: Should triggerResult's aggregationResultBucket be a list? If not, getCategorizedAlertsForBucketLevelMonitor can
                //  be refactored to use a map instead
                val categorizedAlerts = alertService.getCategorizedAlertsForBucketLevelMonitor(
                    monitor, trigger, currentAlertsForTrigger, triggerResult.aggregationResultBuckets.values.toList()
                ).toMutableMap()
                val dedupedAlerts = categorizedAlerts.getOrDefault(AlertCategory.DEDUPED, emptyList())
                var newAlerts = categorizedAlerts.getOrDefault(AlertCategory.NEW, emptyList())

                /*
                 * Index de-duped and new Alerts here (if it's not a test Monitor) so they are available at the time the Actions are executed.
                 *
                 * The new Alerts have to be returned and saved back with their indexed doc ID to prevent duplicate documents
                 * when the Alerts are updated again after Action execution.
                 *
                 * Note: Index operations can fail for various reasons (such as write blocks on cluster), in such a case, the Actions
                 * will still execute with the Alert information in the ctx but the Alerts may not be visible.
                 */
                if (!dryrun && monitor.id != Monitor.NO_ID) {
                    alertService.saveAlerts(dedupedAlerts, retryPolicy, allowUpdatingAcknowledgedAlert = true)
                    newAlerts = alertService.saveNewAlerts(newAlerts, retryPolicy)
                }

                // Store deduped and new Alerts to accumulate across pages
                if (!nextAlerts.containsKey(trigger.id)) {
                    nextAlerts[trigger.id] = mutableMapOf(
                        AlertCategory.DEDUPED to mutableListOf(),
                        AlertCategory.NEW to mutableListOf(),
                        AlertCategory.COMPLETED to mutableListOf()
                    )
                }
                nextAlerts[trigger.id]?.get(AlertCategory.DEDUPED)?.addAll(dedupedAlerts)
                nextAlerts[trigger.id]?.get(AlertCategory.NEW)?.addAll(newAlerts)
            }
        } while (monitorResult.inputResults.afterKeysPresent())

        // The completed Alerts are whatever are left in the currentAlerts.
        // However, this operation will only be done if there was no trigger error, since otherwise the nextAlerts were not collected
        // in favor of just using the currentAlerts as-is.
        currentAlerts.forEach { (trigger, keysToAlertsMap) ->
            if (triggerResults[trigger.id]?.error == null)
                nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED)?.addAll(alertService.convertToCompletedAlerts(keysToAlertsMap))
        }

        for (trigger in monitor.triggers) {
            val alertsToUpdate = mutableSetOf<Alert>()
            val completedAlertsToUpdate = mutableSetOf<Alert>()
            // Filter ACKNOWLEDGED Alerts from the deduped list so they do not have Actions executed for them.
            // New Alerts are ignored since they cannot be acknowledged yet.
            val dedupedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.DEDUPED)
                ?.filterNot { it.state == Alert.State.ACKNOWLEDGED }?.toMutableList()
                ?: mutableListOf()
            // Update nextAlerts so the filtered DEDUPED Alerts are reflected for PER_ALERT Action execution
            nextAlerts[trigger.id]?.set(AlertCategory.DEDUPED, dedupedAlerts)
            val newAlerts = nextAlerts[trigger.id]?.get(AlertCategory.NEW) ?: mutableListOf()
            val completedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED) ?: mutableListOf()

            // Adding all the COMPLETED Alerts to a separate set and removing them if they get added
            // to alertsToUpdate to ensure the Alert doc is updated at the end in either case
            completedAlertsToUpdate.addAll(completedAlerts)

            // All trigger contexts and results should be available at this point since all triggers were evaluated in the main do-while loop
            val triggerCtx = triggerContexts[trigger.id]!!
            val triggerResult = triggerResults[trigger.id]!!
            val monitorOrTriggerError = monitorResult.error ?: triggerResult.error
            val shouldDefaultToPerExecution = defaultToPerExecutionAction(
                monitorId = monitor.id,
                triggerId = trigger.id,
                totalActionableAlertCount = dedupedAlerts.size + newAlerts.size + completedAlerts.size,
                monitorOrTriggerError = monitorOrTriggerError
            )
            for (action in trigger.actions) {
                // ActionExecutionPolicy should not be null for Bucket-Level Monitors since it has a default config when not set explicitly
                val actionExecutionScope = action.getActionExecutionPolicy(monitor)!!.actionExecutionScope
                if (actionExecutionScope is PerAlertActionScope && !shouldDefaultToPerExecution) {
                    for (alertCategory in actionExecutionScope.actionableAlerts) {
                        val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf()
                        for (alert in alertsToExecuteActionsFor) {
                            val actionCtx = getActionContextForAlertCategory(
                                alertCategory, alert, triggerCtx, monitorOrTriggerError
                            )
                            // AggregationResultBucket should not be null here
                            val alertBucketKeysHash = alert.aggregationResultBucket!!.getBucketKeysHash()
                            if (!triggerResult.actionResultsMap.containsKey(alertBucketKeysHash)) {
                                triggerResult.actionResultsMap[alertBucketKeysHash] = mutableMapOf()
                            }

                            // Keeping the throttled response separate from runAction for now since
                            // throttling is not supported for PER_EXECUTION
                            val actionResult = if (isActionActionable(action, alert)) {
                                runAction(action, actionCtx, dryrun)
                            } else {
                                ActionRunResult(action.id, action.name, mapOf(), true, null, null)
                            }

                            triggerResult.actionResultsMap[alertBucketKeysHash]?.set(action.id, actionResult)
                            alertsToUpdate.add(alert)
                            // Remove the alert from completedAlertsToUpdate in case it is present there since
                            // its update will be handled in the alertsToUpdate batch
                            completedAlertsToUpdate.remove(alert)
                        }
                    }
                } else if (actionExecutionScope is PerExecutionActionScope || shouldDefaultToPerExecution) {
                    // If all categories of Alerts are empty, there is nothing to message on and we can skip the Action.
                    // If the error is not null, this is disregarded and the Action is executed anyway so the user can be notified.
                    if (monitorOrTriggerError == null && dedupedAlerts.isEmpty() && newAlerts.isEmpty() && completedAlerts.isEmpty())
                        continue

                    val actionCtx = triggerCtx.copy(
                        dedupedAlerts = dedupedAlerts,
                        newAlerts = newAlerts,
                        completedAlerts = completedAlerts,
                        error = monitorResult.error ?: triggerResult.error
                    )
                    val actionResult = runAction(action, actionCtx, dryrun)
                    // If there was an error during trigger execution then the Alerts to be updated are the current Alerts since the state
                    // was not changed. Otherwise, the Alerts to be updated are the sum of the deduped, new and completed Alerts.
                    val alertsToIterate = if (monitorOrTriggerError == null) {
                        (dedupedAlerts + newAlerts + completedAlerts)
                    } else currentAlerts[trigger]?.map { it.value } ?: listOf()
                    // Save the Action run result for every Alert
                    for (alert in alertsToIterate) {
                        val alertBucketKeysHash = alert.aggregationResultBucket!!.getBucketKeysHash()
                        if (!triggerResult.actionResultsMap.containsKey(alertBucketKeysHash)) {
                            triggerResult.actionResultsMap[alertBucketKeysHash] = mutableMapOf()
                        }
                        triggerResult.actionResultsMap[alertBucketKeysHash]?.set(action.id, actionResult)
                        alertsToUpdate.add(alert)
                        // Remove the alert from completedAlertsToUpdate in case it is present there since
                        // its update will be handled in the alertsToUpdate batch
                        completedAlertsToUpdate.remove(alert)
                    }
                }
            }

            // Alerts are only added to alertsToUpdate after Action execution meaning the action results for it should be present
            // in the actionResultsMap but returning a default value when accessing the map to be safe.
            val updatedAlerts = alertsToUpdate.map { alert ->
                val bucketKeysHash = alert.aggregationResultBucket!!.getBucketKeysHash()
                val actionResults = triggerResult.actionResultsMap.getOrDefault(bucketKeysHash, emptyMap<String, ActionRunResult>())
                alertService.updateActionResultsForBucketLevelAlert(
                    alert.copy(lastNotificationTime = currentTime()),
                    actionResults,
                    // TODO: Update BucketLevelTriggerRunResult.alertError() to retrieve error based on the first failed Action
                    monitorResult.alertError() ?: triggerResult.alertError()
                )
            }

            // Update Alerts with action execution results (if it's not a test Monitor).
            // ACKNOWLEDGED Alerts should not be saved here since actions are not executed for them.
            if (!dryrun && monitor.id != Monitor.NO_ID) {
                alertService.saveAlerts(updatedAlerts, retryPolicy, allowUpdatingAcknowledgedAlert = false)
                // Save any COMPLETED Alerts that were not covered in updatedAlerts
                alertService.saveAlerts(completedAlertsToUpdate.toList(), retryPolicy, allowUpdatingAcknowledgedAlert = false)
            }
        }

        return monitorResult.copy(inputResults = firstPageOfInputResults, triggerResults = triggerResults)
    }