in src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/forcemerge/AttemptCallForceMergeStep.kt [44:89]
override suspend fun execute(): AttemptCallForceMergeStep {
try {
val startTime = Instant.now().toEpochMilli()
val request = ForceMergeRequest(indexName).maxNumSegments(config.maxNumSegments)
var response: ForceMergeResponse? = null
var throwable: Throwable? = null
GlobalScope.launch(Dispatchers.IO + CoroutineName("ISM-ForceMerge-$indexName")) {
try {
response = client.admin().indices().suspendUntil { forceMerge(request, it) }
if (response?.status == RestStatus.OK) {
logger.info(getSuccessMessage(indexName))
} else {
logger.warn(getFailedMessage(indexName))
}
} catch (t: Throwable) {
throwable = t
}
}
while (response == null && (Instant.now().toEpochMilli() - startTime) < FIVE_MINUTES_IN_MILLIS) {
delay(FIVE_SECONDS_IN_MILLIS)
throwable?.let { throw it }
}
val shadowedResponse = response
if (shadowedResponse?.let { it.status == RestStatus.OK } != false) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to if (shadowedResponse == null) getSuccessfulCallMessage(indexName) else getSuccessMessage(indexName))
} else {
// Otherwise the request to force merge encountered some problem
stepStatus = StepStatus.FAILED
info = mapOf(
"message" to getFailedMessage(indexName),
"status" to shadowedResponse.status,
"shard_failures" to shadowedResponse.shardFailures.map { it.getUsefulCauseString() }
)
}
} catch (e: RemoteTransportException) {
handleException(ExceptionsHelper.unwrapCause(e) as Exception)
} catch (e: Exception) {
handleException(e)
}
return this
}