suspend fun saveNewAlerts()

in alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt [333:385]


    suspend fun saveNewAlerts(alerts: List<Alert>, retryPolicy: BackoffPolicy): List<Alert> {
        val savedAlerts = mutableListOf<Alert>()
        var alertsBeingIndexed = alerts
        var requestsToRetry: MutableList<IndexRequest> = alerts.map { alert ->
            if (alert.state != Alert.State.ACTIVE) {
                throw IllegalStateException("Unexpected attempt to save new alert [$alert] with state [${alert.state}]")
            }
            if (alert.id != Alert.NO_ID) {
                throw IllegalStateException("Unexpected attempt to save new alert [$alert] with an existing alert ID [${alert.id}]")
            }
            IndexRequest(AlertIndices.ALERT_INDEX)
                .routing(alert.monitorId)
                .source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
        }.toMutableList()

        if (requestsToRetry.isEmpty()) return listOf()

        // Retry Bulk requests if there was any 429 response.
        // The responses of a bulk request will be in the same order as the individual requests.
        // If the index request succeeded for an Alert, the document ID from the response is taken and saved in the Alert.
        // If the index request is to be retried, the Alert is saved separately as well so that its relative ordering is maintained in
        // relation to index request in the retried bulk request for when it eventually succeeds.
        retryPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
            val bulkRequest = BulkRequest().add(requestsToRetry)
            val bulkResponse: BulkResponse = client.suspendUntil { client.bulk(bulkRequest, it) }
            // TODO: This is only used to retrieve the retryCause, could instead fetch it from the bulkResponse iteration below
            val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed }

            requestsToRetry = mutableListOf()
            val alertsBeingRetried = mutableListOf<Alert>()
            bulkResponse.items.forEach { item ->
                if (item.isFailed) {
                    // TODO: What if the failure cause was not TOO_MANY_REQUESTS, should these be saved and logged?
                    if (item.status() == RestStatus.TOO_MANY_REQUESTS) {
                        requestsToRetry.add(bulkRequest.requests()[item.itemId] as IndexRequest)
                        alertsBeingRetried.add(alertsBeingIndexed[item.itemId])
                    }
                } else {
                    // The ID of the BulkItemResponse in this case is the document ID resulting from the DocWriteRequest operation
                    savedAlerts.add(alertsBeingIndexed[item.itemId].copy(id = item.id))
                }
            }

            alertsBeingIndexed = alertsBeingRetried

            if (requestsToRetry.isNotEmpty()) {
                val retryCause = failedResponses.first { it.status() == RestStatus.TOO_MANY_REQUESTS }.failure.cause
                throw ExceptionsHelper.convertToOpenSearchException(retryCause)
            }
        }

        return savedAlerts
    }