override fun doExecute()

in src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexAction.kt [59:113]


    override fun doExecute(task: Task, request: ReplicateIndexRequest, listener: ActionListener<ReplicateIndexResponse>) {
        launch(threadPool.coroutineContext()) {
            listener.completeWith {
                log.info("Setting-up replication for ${request.leaderAlias}:${request.leaderIndex} -> ${request.followerIndex}")
                val user = SecurityContext.fromSecurityThreadContext(threadPool.threadContext)

                val followerReplContext = ReplicationContext(request.followerIndex,
                        user?.overrideFgacRole(request.useRoles?.get(ReplicateIndexRequest.FOLLOWER_CLUSTER_ROLE)))
                val leaderReplContext = ReplicationContext(request.leaderIndex,
                        user?.overrideFgacRole(request.useRoles?.get(ReplicateIndexRequest.LEADER_CLUSTER_ROLE)))

                // For autofollow request, setup checks are already made during addition of the pattern with
                // original user
                if(!request.isAutoFollowRequest) {
                    val setupChecksReq = SetupChecksRequest(followerReplContext, leaderReplContext, request.leaderAlias)
                    val setupChecksRes = client.suspendExecute(SetupChecksAction.INSTANCE, setupChecksReq)
                    if(!setupChecksRes.isAcknowledged) {
                        log.error("Setup checks failed while triggering replication for ${request.leaderAlias}:${request.leaderIndex} -> " +
                                "${request.followerIndex}")
                        throw org.opensearch.replication.ReplicationException("Setup checks failed while setting-up replication for ${request.followerIndex}")
                    }
                }

                // Any checks on the settings is followed by setup checks to ensure all relevant changes are
                // present across the plugins
                // validate index metadata on the leader cluster
                val leaderIndexMetadata = getLeaderIndexMetadata(request.leaderAlias, request.leaderIndex)
                ValidationUtil.validateLeaderIndexMetadata(leaderIndexMetadata)

                val leaderSettings = getLeaderIndexSettings(request.leaderAlias, request.leaderIndex)

                if (leaderSettings.keySet().contains(ReplicationPlugin.REPLICATED_INDEX_SETTING.key) and
                        !leaderSettings.get(ReplicationPlugin.REPLICATED_INDEX_SETTING.key).isNullOrBlank()) {
                    throw IllegalArgumentException("Cannot Replicate a Replicated Index ${request.leaderIndex}")
                }
                if (!leaderSettings.getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.key, true)) {
                    throw IllegalArgumentException("Cannot Replicate an index where the setting ${IndexSettings.INDEX_SOFT_DELETES_SETTING.key} is disabled")
                }

                // For k-NN indices, k-NN loads its own engine and this conflicts with the replication follower engine
                // Blocking k-NN indices for replication
                if(leaderSettings.getAsBoolean(KNN_INDEX_SETTING, false)) {
                    throw IllegalArgumentException("Cannot replicate k-NN index - ${request.leaderIndex}")
                }

                ValidationUtil.validateAnalyzerSettings(environment, leaderSettings, request.settings)

                // Setup checks are successful and trigger replication for the index
                // permissions evaluation to trigger replication is based on the current security context set
                val internalReq = ReplicateIndexMasterNodeRequest(user, request)
                client.suspendExecute(ReplicateIndexMasterNodeAction.INSTANCE, internalReq)
                ReplicateIndexResponse(true)
            }
        }
    }