override fun doExecute()

in src/main/kotlin/org/opensearch/replication/action/setup/TransportSetupChecksAction.kt [58:150]


    override fun doExecute(task: Task, request: SetupChecksRequest, listener: ActionListener<AcknowledgedResponse>) {
        var leaderClusterClient: Client? = null
        val followerClusterName = clusterService.clusterName.value()
        try {
            leaderClusterClient = client.getRemoteClusterClient(request.connectionName)
        } catch (e: Exception) {
            // Logging it as info as this check is to see if leader cluster is added or not
            log.info("Failed to connect to remote cluster $request.connectionName with error $e")
            listener.onFailure(e)
            return
        }

        // If user obj is present, security plugin is enabled. Roles are mandatory
        if(request.followerContext.user != null && request.followerContext.user!!.roles.isEmpty()) {
            log.info("User roles are empty for follower_resource:${request.followerContext.resource}")
            listener.onFailure(OpenSearchSecurityException("Follower roles are mandatory for replication", RestStatus.FORBIDDEN))
            return
        }

        if(request.leaderContext.user != null && request.leaderContext.user!!.roles.isEmpty()) {
            log.info("User roles are empty for leader_resource:${request.leaderContext.resource}")
            listener.onFailure(OpenSearchSecurityException("Leader roles are mandatory for replication", RestStatus.FORBIDDEN))
            return
        }

        val userPermissionsValidationAtLocal = StepListener<AcknowledgedResponse>()
        val userPermissionsValidationAtRemote = StepListener<AcknowledgedResponse>()
        val rolePermissionsValidationAtLocal = StepListener<AcknowledgedResponse>()
        val rolePermissionsValidationAtRemote = StepListener<AcknowledgedResponse>()

        rolePermissionsValidationAtRemote.whenComplete(
                { r ->
                    log.info("Permissions validation successful for role [connection:${request.connectionName}, " +
                            "resource:${request.leaderContext.resource}]")
                    listener.onResponse(r)
                },
                { e ->
                    var exceptionToThrow = e
                    if ((e is RemoteTransportException) && (e.cause is ActionNotFoundTransportException)) {
                        exceptionToThrow = UnsupportedOperationException("Replication is not enabled on the remote domain")
                    }
                    log.error("Permissions validation failed for role [connection:${request.connectionName}, " +
                            "resource:${request.leaderContext.resource}] with ${exceptionToThrow.stackTraceToString()}")
                    listener.onFailure(unwrapSecurityExceptionIfPresent(exceptionToThrow))
                }
        )

        rolePermissionsValidationAtLocal.whenComplete(
                {
                    log.info("Permissions validation successful for User [connection:${request.connectionName}, " +
                            "resource:${request.leaderContext.resource}]")
                    triggerPermissionsValidation(leaderClusterClient!!, request.connectionName, request.leaderContext, true, rolePermissionsValidationAtRemote)
                },
                { e ->
                    log.error("Permissions validation failed for role [local:$followerClusterName, " +
                            "resource:${request.followerContext.resource}] with ${e.stackTraceToString()}")
                    listener.onFailure(unwrapSecurityExceptionIfPresent(e))
                }
        )

        userPermissionsValidationAtRemote.whenComplete(
                {
                    log.info("Permissions validation successful for User [connection:${request.connectionName}, " +
                            "resource:${request.leaderContext.resource}]")
                    triggerPermissionsValidation(client, followerClusterName, request.followerContext, true, rolePermissionsValidationAtLocal)
                },
                { e ->
                    var exceptionToThrow = e
                    if ((e is RemoteTransportException) && (e.cause is ActionNotFoundTransportException)) {
                        exceptionToThrow = UnsupportedOperationException("Replication is not enabled on the remote domain")
                    }
                    log.error("Permissions validation failed for User [connection:${request.connectionName}, " +
                            "resource:${request.leaderContext.resource}] with ${exceptionToThrow.stackTraceToString()}")
                    listener.onFailure(unwrapSecurityExceptionIfPresent(exceptionToThrow))
                }
        )

        userPermissionsValidationAtLocal.whenComplete(
                {
                    log.info("Permissions validation successful for User [local:$followerClusterName, " +
                            "resource:${request.followerContext.resource}]")
                    triggerPermissionsValidation(leaderClusterClient!!, request.connectionName, request.leaderContext, false, userPermissionsValidationAtRemote)
                },
                { e ->
                    log.error("Permissions validation failed for User [local:$followerClusterName, " +
                            "resource:${request.followerContext.resource}] with ${e.stackTraceToString()}")
                    listener.onFailure(unwrapSecurityExceptionIfPresent(e))
                }
        )

        triggerPermissionsValidation(client, followerClusterName, request.followerContext, false, userPermissionsValidationAtLocal)

    }