suspend fun index()

in src/main/kotlin/org/opensearch/indexmanagement/transform/TransformIndexer.kt [71:115]


    suspend fun index(docsToIndex: List<DocWriteRequest<*>>): Long {
        var updatableDocsToIndex = docsToIndex
        var indexTimeInMillis = 0L
        val nonRetryableFailures = mutableListOf<BulkItemResponse>()
        try {
            if (updatableDocsToIndex.isNotEmpty()) {
                val targetIndex = updatableDocsToIndex.first().index()
                logger.debug("Attempting to index ${updatableDocsToIndex.size} documents to $targetIndex")
                createTargetIndex(targetIndex)
                backoffPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
                    val bulkRequest = BulkRequest().add(updatableDocsToIndex)
                    val bulkResponse: BulkResponse = client.suspendUntil { bulk(bulkRequest, it) }
                    indexTimeInMillis += bulkResponse.took.millis
                    val retryableFailures = mutableListOf<BulkItemResponse>()
                    (bulkResponse.items ?: arrayOf()).filter { it.isFailed }.forEach { failedResponse ->
                        if (failedResponse.status() == RestStatus.TOO_MANY_REQUESTS) {
                            retryableFailures.add(failedResponse)
                        } else {
                            nonRetryableFailures.add(failedResponse)
                        }
                    }
                    updatableDocsToIndex = retryableFailures.map { failure ->
                        updatableDocsToIndex[failure.itemId] as IndexRequest
                    }
                    if (updatableDocsToIndex.isNotEmpty()) {
                        throw ExceptionsHelper.convertToOpenSearchException(retryableFailures.first().failure.cause)
                    }
                }
            }
            if (nonRetryableFailures.isNotEmpty()) {
                logger.error("Failed to index ${nonRetryableFailures.size} documents")
                throw ExceptionsHelper.convertToOpenSearchException(nonRetryableFailures.first().failure.cause)
            }
            return indexTimeInMillis
        } catch (e: TransformIndexException) {
            throw e
        } catch (e: RemoteTransportException) {
            val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
            throw TransformIndexException("Failed to index the documents", unwrappedException)
        } catch (e: OpenSearchSecurityException) {
            throw TransformIndexException("Failed to index the documents - missing required index permissions: ${e.localizedMessage}", e)
        } catch (e: Exception) {
            throw TransformIndexException("Failed to index the documents", e)
        }
    }