in src/main/kotlin/org/opensearch/replication/action/setup/TransportSetupChecksAction.kt [58:150]
override fun doExecute(task: Task, request: SetupChecksRequest, listener: ActionListener<AcknowledgedResponse>) {
var leaderClusterClient: Client? = null
val followerClusterName = clusterService.clusterName.value()
try {
leaderClusterClient = client.getRemoteClusterClient(request.connectionName)
} catch (e: Exception) {
// Logging it as info as this check is to see if leader cluster is added or not
log.info("Failed to connect to remote cluster $request.connectionName with error $e")
listener.onFailure(e)
return
}
// If user obj is present, security plugin is enabled. Roles are mandatory
if(request.followerContext.user != null && request.followerContext.user!!.roles.isEmpty()) {
log.info("User roles are empty for follower_resource:${request.followerContext.resource}")
listener.onFailure(OpenSearchSecurityException("Follower roles are mandatory for replication", RestStatus.FORBIDDEN))
return
}
if(request.leaderContext.user != null && request.leaderContext.user!!.roles.isEmpty()) {
log.info("User roles are empty for leader_resource:${request.leaderContext.resource}")
listener.onFailure(OpenSearchSecurityException("Leader roles are mandatory for replication", RestStatus.FORBIDDEN))
return
}
val userPermissionsValidationAtLocal = StepListener<AcknowledgedResponse>()
val userPermissionsValidationAtRemote = StepListener<AcknowledgedResponse>()
val rolePermissionsValidationAtLocal = StepListener<AcknowledgedResponse>()
val rolePermissionsValidationAtRemote = StepListener<AcknowledgedResponse>()
rolePermissionsValidationAtRemote.whenComplete(
{ r ->
log.info("Permissions validation successful for role [connection:${request.connectionName}, " +
"resource:${request.leaderContext.resource}]")
listener.onResponse(r)
},
{ e ->
var exceptionToThrow = e
if ((e is RemoteTransportException) && (e.cause is ActionNotFoundTransportException)) {
exceptionToThrow = UnsupportedOperationException("Replication is not enabled on the remote domain")
}
log.error("Permissions validation failed for role [connection:${request.connectionName}, " +
"resource:${request.leaderContext.resource}] with ${exceptionToThrow.stackTraceToString()}")
listener.onFailure(unwrapSecurityExceptionIfPresent(exceptionToThrow))
}
)
rolePermissionsValidationAtLocal.whenComplete(
{
log.info("Permissions validation successful for User [connection:${request.connectionName}, " +
"resource:${request.leaderContext.resource}]")
triggerPermissionsValidation(leaderClusterClient!!, request.connectionName, request.leaderContext, true, rolePermissionsValidationAtRemote)
},
{ e ->
log.error("Permissions validation failed for role [local:$followerClusterName, " +
"resource:${request.followerContext.resource}] with ${e.stackTraceToString()}")
listener.onFailure(unwrapSecurityExceptionIfPresent(e))
}
)
userPermissionsValidationAtRemote.whenComplete(
{
log.info("Permissions validation successful for User [connection:${request.connectionName}, " +
"resource:${request.leaderContext.resource}]")
triggerPermissionsValidation(client, followerClusterName, request.followerContext, true, rolePermissionsValidationAtLocal)
},
{ e ->
var exceptionToThrow = e
if ((e is RemoteTransportException) && (e.cause is ActionNotFoundTransportException)) {
exceptionToThrow = UnsupportedOperationException("Replication is not enabled on the remote domain")
}
log.error("Permissions validation failed for User [connection:${request.connectionName}, " +
"resource:${request.leaderContext.resource}] with ${exceptionToThrow.stackTraceToString()}")
listener.onFailure(unwrapSecurityExceptionIfPresent(exceptionToThrow))
}
)
userPermissionsValidationAtLocal.whenComplete(
{
log.info("Permissions validation successful for User [local:$followerClusterName, " +
"resource:${request.followerContext.resource}]")
triggerPermissionsValidation(leaderClusterClient!!, request.connectionName, request.leaderContext, false, userPermissionsValidationAtRemote)
},
{ e ->
log.error("Permissions validation failed for User [local:$followerClusterName, " +
"resource:${request.followerContext.resource}] with ${e.stackTraceToString()}")
listener.onFailure(unwrapSecurityExceptionIfPresent(e))
}
)
triggerPermissionsValidation(client, followerClusterName, request.followerContext, false, userPermissionsValidationAtLocal)
}