in alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt [333:385]
suspend fun saveNewAlerts(alerts: List<Alert>, retryPolicy: BackoffPolicy): List<Alert> {
val savedAlerts = mutableListOf<Alert>()
var alertsBeingIndexed = alerts
var requestsToRetry: MutableList<IndexRequest> = alerts.map { alert ->
if (alert.state != Alert.State.ACTIVE) {
throw IllegalStateException("Unexpected attempt to save new alert [$alert] with state [${alert.state}]")
}
if (alert.id != Alert.NO_ID) {
throw IllegalStateException("Unexpected attempt to save new alert [$alert] with an existing alert ID [${alert.id}]")
}
IndexRequest(AlertIndices.ALERT_INDEX)
.routing(alert.monitorId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
}.toMutableList()
if (requestsToRetry.isEmpty()) return listOf()
// Retry Bulk requests if there was any 429 response.
// The responses of a bulk request will be in the same order as the individual requests.
// If the index request succeeded for an Alert, the document ID from the response is taken and saved in the Alert.
// If the index request is to be retried, the Alert is saved separately as well so that its relative ordering is maintained in
// relation to index request in the retried bulk request for when it eventually succeeds.
retryPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
val bulkRequest = BulkRequest().add(requestsToRetry)
val bulkResponse: BulkResponse = client.suspendUntil { client.bulk(bulkRequest, it) }
// TODO: This is only used to retrieve the retryCause, could instead fetch it from the bulkResponse iteration below
val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed }
requestsToRetry = mutableListOf()
val alertsBeingRetried = mutableListOf<Alert>()
bulkResponse.items.forEach { item ->
if (item.isFailed) {
// TODO: What if the failure cause was not TOO_MANY_REQUESTS, should these be saved and logged?
if (item.status() == RestStatus.TOO_MANY_REQUESTS) {
requestsToRetry.add(bulkRequest.requests()[item.itemId] as IndexRequest)
alertsBeingRetried.add(alertsBeingIndexed[item.itemId])
}
} else {
// The ID of the BulkItemResponse in this case is the document ID resulting from the DocWriteRequest operation
savedAlerts.add(alertsBeingIndexed[item.itemId].copy(id = item.id))
}
}
alertsBeingIndexed = alertsBeingRetried
if (requestsToRetry.isNotEmpty()) {
val retryCause = failedResponses.first { it.status() == RestStatus.TOO_MANY_REQUESTS }.failure.cause
throw ExceptionsHelper.convertToOpenSearchException(retryCause)
}
}
return savedAlerts
}