in src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt [71:115]
suspend fun index(docsToIndex: List<DocWriteRequest<*>>): Long {
var updatableDocsToIndex = docsToIndex
var indexTimeInMillis = 0L
val nonRetryableFailures = mutableListOf<BulkItemResponse>()
try {
if (updatableDocsToIndex.isNotEmpty()) {
val targetIndex = updatableDocsToIndex.first().index()
logger.debug("Attempting to index ${updatableDocsToIndex.size} documents to $targetIndex")
createTargetIndex(targetIndex)
backoffPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
val bulkRequest = BulkRequest().add(updatableDocsToIndex)
val bulkResponse: BulkResponse = client.suspendUntil { bulk(bulkRequest, it) }
indexTimeInMillis += bulkResponse.took.millis
val retryableFailures = mutableListOf<BulkItemResponse>()
(bulkResponse.items ?: arrayOf()).filter { it.isFailed }.forEach { failedResponse ->
if (failedResponse.status() == RestStatus.TOO_MANY_REQUESTS) {
retryableFailures.add(failedResponse)
} else {
nonRetryableFailures.add(failedResponse)
}
}
updatableDocsToIndex = retryableFailures.map { failure ->
updatableDocsToIndex[failure.itemId] as IndexRequest
}
if (updatableDocsToIndex.isNotEmpty()) {
throw ExceptionsHelper.convertToOpenSearchException(retryableFailures.first().failure.cause)
}
}
}
if (nonRetryableFailures.isNotEmpty()) {
logger.error("Failed to index ${nonRetryableFailures.size} documents")
throw ExceptionsHelper.convertToOpenSearchException(nonRetryableFailures.first().failure.cause)
}
return indexTimeInMillis
} catch (e: TransformIndexException) {
throw e
} catch (e: RemoteTransportException) {
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
throw TransformIndexException("Failed to index the documents", unwrappedException)
} catch (e: OpenSearchSecurityException) {
throw TransformIndexException("Failed to index the documents - missing required index permissions: ${e.localizedMessage}", e)
} catch (e: Exception) {
throw TransformIndexException("Failed to index the documents", e)
}
}