in src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexAction.kt [59:113]
override fun doExecute(task: Task, request: ReplicateIndexRequest, listener: ActionListener<ReplicateIndexResponse>) {
launch(threadPool.coroutineContext()) {
listener.completeWith {
log.info("Setting-up replication for ${request.leaderAlias}:${request.leaderIndex} -> ${request.followerIndex}")
val user = SecurityContext.fromSecurityThreadContext(threadPool.threadContext)
val followerReplContext = ReplicationContext(request.followerIndex,
user?.overrideFgacRole(request.useRoles?.get(ReplicateIndexRequest.FOLLOWER_CLUSTER_ROLE)))
val leaderReplContext = ReplicationContext(request.leaderIndex,
user?.overrideFgacRole(request.useRoles?.get(ReplicateIndexRequest.LEADER_CLUSTER_ROLE)))
// For autofollow request, setup checks are already made during addition of the pattern with
// original user
if(!request.isAutoFollowRequest) {
val setupChecksReq = SetupChecksRequest(followerReplContext, leaderReplContext, request.leaderAlias)
val setupChecksRes = client.suspendExecute(SetupChecksAction.INSTANCE, setupChecksReq)
if(!setupChecksRes.isAcknowledged) {
log.error("Setup checks failed while triggering replication for ${request.leaderAlias}:${request.leaderIndex} -> " +
"${request.followerIndex}")
throw org.opensearch.replication.ReplicationException("Setup checks failed while setting-up replication for ${request.followerIndex}")
}
}
// Any checks on the settings is followed by setup checks to ensure all relevant changes are
// present across the plugins
// validate index metadata on the leader cluster
val leaderIndexMetadata = getLeaderIndexMetadata(request.leaderAlias, request.leaderIndex)
ValidationUtil.validateLeaderIndexMetadata(leaderIndexMetadata)
val leaderSettings = getLeaderIndexSettings(request.leaderAlias, request.leaderIndex)
if (leaderSettings.keySet().contains(ReplicationPlugin.REPLICATED_INDEX_SETTING.key) and
!leaderSettings.get(ReplicationPlugin.REPLICATED_INDEX_SETTING.key).isNullOrBlank()) {
throw IllegalArgumentException("Cannot Replicate a Replicated Index ${request.leaderIndex}")
}
if (!leaderSettings.getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.key, true)) {
throw IllegalArgumentException("Cannot Replicate an index where the setting ${IndexSettings.INDEX_SOFT_DELETES_SETTING.key} is disabled")
}
// For k-NN indices, k-NN loads its own engine and this conflicts with the replication follower engine
// Blocking k-NN indices for replication
if(leaderSettings.getAsBoolean(KNN_INDEX_SETTING, false)) {
throw IllegalArgumentException("Cannot replicate k-NN index - ${request.leaderIndex}")
}
ValidationUtil.validateAnalyzerSettings(environment, leaderSettings, request.settings)
// Setup checks are successful and trigger replication for the index
// permissions evaluation to trigger replication is based on the current security context set
val internalReq = ReplicateIndexMasterNodeRequest(user, request)
client.suspendExecute(ReplicateIndexMasterNodeAction.INSTANCE, internalReq)
ReplicateIndexResponse(true)
}
}
}