override fun masterOperation()

in src/main/kotlin/org/opensearch/replication/action/resume/TransportResumeIndexReplicationAction.kt [83:131]


    override fun masterOperation(request: ResumeIndexReplicationRequest, state: ClusterState,
                                 listener: ActionListener<AcknowledgedResponse>) {
        launch(Dispatchers.Unconfined + threadPool.coroutineContext()) {
            listener.completeWith {
                log.info("Resuming index replication on index:" + request.indexName)
                validateResumeReplicationRequest(request)
                val replMetdata = replicationMetadataManager.getIndexReplicationMetadata(request.indexName)
                val remoteMetadata = getLeaderIndexMetadata(replMetdata.connectionName, replMetdata.leaderContext.resource)
                val params = IndexReplicationParams(replMetdata.connectionName, remoteMetadata.index, request.indexName)
                if (!isResumable(params)) {
                    throw ResourceNotFoundException("Retention lease doesn't exist. Replication can't be resumed for ${request.indexName}")
                }

                val remoteClient = client.getRemoteClusterClient(params.leaderAlias)
                val getSettingsRequest = GetSettingsRequest().includeDefaults(false).indices(params.leaderIndex.name)
                val settingsResponse = remoteClient.suspending(
                    remoteClient.admin().indices()::getSettings,
                    injectSecurityContext = true
                )(getSettingsRequest)

                val leaderSettings = settingsResponse.indexToSettings.get(params.leaderIndex.name) ?: throw IndexNotFoundException(params.leaderIndex.name)

                // k-NN Setting is a static setting. In case the setting is changed at the leader index before resume,
                // block the resume.
                if(leaderSettings.getAsBoolean(KNN_INDEX_SETTING, false)) {
                    throw IllegalStateException("Cannot resume replication for k-NN enabled index ${params.leaderIndex.name}.")
                }

                ValidationUtil.validateAnalyzerSettings(environment, leaderSettings, replMetdata.settings)

                replicationMetadataManager.updateIndexReplicationState(request.indexName, ReplicationOverallState.RUNNING)
                val task = persistentTasksService.startTask("replication:index:${request.indexName}",
                        IndexReplicationExecutor.TASK_NAME, params)

                if (!task.isAssigned) {
                    log.error("Failed to assign task")
                    listener.onResponse(ReplicateIndexResponse(false))
                }

                // Now wait for the replication to start and the follower index to get created before returning
                persistentTasksService.waitForTaskCondition(task.id, request.timeout()) { t ->
                    val replicationState = (t.state as IndexReplicationState?)?.state
                    replicationState == ReplicationState.FOLLOWING
                }

                AcknowledgedResponse(true)
            }
        }
    }