in src/main/kotlin/org/opensearch/indexmanagement/transform/TransformValidator.kt [55:86]
suspend fun validate(transform: Transform): TransformValidationResult {
val errorMessage = "Failed to validate the transform job"
try {
val issues = mutableListOf<String>()
if (circuitBreakerEnabled && jvmService.stats().mem.heapUsedPercent > circuitBreakerJvmThreshold) {
issues.add("The cluster is breaching the jvm usage threshold [$circuitBreakerJvmThreshold], cannot execute the transform")
return TransformValidationResult(issues.isEmpty(), issues)
}
val concreteIndices =
indexNameExpressionResolver.concreteIndexNames(clusterService.state(), IndicesOptions.lenientExpand(), true, transform.sourceIndex)
if (concreteIndices.isEmpty()) return TransformValidationResult(false, listOf("No specified source index exist in the cluster"))
val request = ClusterHealthRequest()
.indices(*concreteIndices)
.waitForYellowStatus()
val response: ClusterHealthResponse = client.suspendUntil { execute(ClusterHealthAction.INSTANCE, request, it) }
if (response.isTimedOut) {
issues.add("Cannot determine that the requested source indices are healthy")
return TransformValidationResult(issues.isEmpty(), issues)
}
concreteIndices.forEach { index -> issues.addAll(validateIndex(index, transform)) }
return TransformValidationResult(issues.isEmpty(), issues)
} catch (e: RemoteTransportException) {
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
throw TransformValidationException(errorMessage, unwrappedException)
} catch (e: OpenSearchSecurityException) {
throw TransformValidationException("$errorMessage - missing required index permissions: ${e.localizedMessage}")
} catch (e: Exception) {
throw TransformValidationException(errorMessage, e)
}
}