in src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupIndexer.kt [57:100]
suspend fun indexRollups(rollup: Rollup, internalComposite: InternalComposite): RollupIndexResult {
try {
var requestsToRetry = convertResponseToRequests(rollup, internalComposite)
var stats = RollupStats(0, 0, requestsToRetry.size.toLong(), 0, 0)
val nonRetryableFailures = mutableListOf<BulkItemResponse>()
if (requestsToRetry.isNotEmpty()) {
retryIngestPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
if (it.seconds >= (Rollup.ROLLUP_LOCK_DURATION_SECONDS / 2)) {
throw ExceptionsHelper.convertToOpenSearchException(
IllegalStateException("Cannot retry ingestion with a delay more than half of the rollup lock TTL")
)
}
val bulkRequest = BulkRequest().add(requestsToRetry)
val bulkResponse: BulkResponse = client.suspendUntil { bulk(bulkRequest, it) }
stats = stats.copy(indexTimeInMillis = stats.indexTimeInMillis + bulkResponse.took.millis)
val retryableFailures = mutableListOf<BulkItemResponse>()
(bulkResponse.items ?: arrayOf()).filter { it.isFailed }.forEach { failedResponse ->
if (failedResponse.status() == RestStatus.TOO_MANY_REQUESTS) {
retryableFailures.add(failedResponse)
} else {
nonRetryableFailures.add(failedResponse)
}
}
requestsToRetry = retryableFailures.map { retryableFailure -> bulkRequest.requests()[retryableFailure.itemId] as IndexRequest }
if (requestsToRetry.isNotEmpty()) {
val retryCause = retryableFailures.first().failure.cause
throw ExceptionsHelper.convertToOpenSearchException(retryCause)
}
}
}
if (nonRetryableFailures.isNotEmpty()) {
logger.error("Failed to index ${nonRetryableFailures.size} documents")
throw ExceptionsHelper.convertToOpenSearchException(nonRetryableFailures.first().failure.cause)
}
return RollupIndexResult.Success(stats)
} catch (e: RemoteTransportException) {
logger.error(e.message, e.cause)
return RollupIndexResult.Failure(cause = ExceptionsHelper.unwrapCause(e) as Exception)
} catch (e: Exception) {
logger.error(e.message, e.cause)
return RollupIndexResult.Failure(cause = e)
}
}