in src/main/kotlin/org/opensearch/replication/action/resume/TransportResumeIndexReplicationAction.kt [83:131]
override fun masterOperation(request: ResumeIndexReplicationRequest, state: ClusterState,
listener: ActionListener<AcknowledgedResponse>) {
launch(Dispatchers.Unconfined + threadPool.coroutineContext()) {
listener.completeWith {
log.info("Resuming index replication on index:" + request.indexName)
validateResumeReplicationRequest(request)
val replMetdata = replicationMetadataManager.getIndexReplicationMetadata(request.indexName)
val remoteMetadata = getLeaderIndexMetadata(replMetdata.connectionName, replMetdata.leaderContext.resource)
val params = IndexReplicationParams(replMetdata.connectionName, remoteMetadata.index, request.indexName)
if (!isResumable(params)) {
throw ResourceNotFoundException("Retention lease doesn't exist. Replication can't be resumed for ${request.indexName}")
}
val remoteClient = client.getRemoteClusterClient(params.leaderAlias)
val getSettingsRequest = GetSettingsRequest().includeDefaults(false).indices(params.leaderIndex.name)
val settingsResponse = remoteClient.suspending(
remoteClient.admin().indices()::getSettings,
injectSecurityContext = true
)(getSettingsRequest)
val leaderSettings = settingsResponse.indexToSettings.get(params.leaderIndex.name) ?: throw IndexNotFoundException(params.leaderIndex.name)
// k-NN Setting is a static setting. In case the setting is changed at the leader index before resume,
// block the resume.
if(leaderSettings.getAsBoolean(KNN_INDEX_SETTING, false)) {
throw IllegalStateException("Cannot resume replication for k-NN enabled index ${params.leaderIndex.name}.")
}
ValidationUtil.validateAnalyzerSettings(environment, leaderSettings, replMetdata.settings)
replicationMetadataManager.updateIndexReplicationState(request.indexName, ReplicationOverallState.RUNNING)
val task = persistentTasksService.startTask("replication:index:${request.indexName}",
IndexReplicationExecutor.TASK_NAME, params)
if (!task.isAssigned) {
log.error("Failed to assign task")
listener.onResponse(ReplicateIndexResponse(false))
}
// Now wait for the replication to start and the follower index to get created before returning
persistentTasksService.waitForTaskCondition(task.id, request.timeout()) { t ->
val replicationState = (t.state as IndexReplicationState?)?.state
replicationState == ReplicationState.FOLLOWING
}
AcknowledgedResponse(true)
}
}
}