in src/main/kotlin/org/opensearch/replication/action/index/TransportReplicateIndexMasterNodeAction.kt [83:139]
override fun masterOperation(request: ReplicateIndexMasterNodeRequest, state: ClusterState,
listener: ActionListener<AcknowledgedResponse>) {
val replicateIndexReq = request.replicateIndexReq
val user = request.user
log.trace("Triggering relevant tasks to start replication for " +
"${replicateIndexReq.leaderAlias}:${replicateIndexReq.leaderIndex} -> ${replicateIndexReq.followerIndex}")
// For now this returns a response after creating the follower index and starting the replication tasks
// for each shard. If that takes too long we can start the task asynchronously and return the response first.
launch(Dispatchers.Unconfined + threadPool.coroutineContext()) {
try {
if(clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_FOLLOWER_BLOCK_START)) {
log.debug("Replication cannot be started as " +
"start block(${ReplicationPlugin.REPLICATION_FOLLOWER_BLOCK_START}) is set")
throw OpenSearchStatusException("[FORBIDDEN] Replication START block is set", RestStatus.FORBIDDEN)
}
val remoteMetadata = getRemoteIndexMetadata(replicateIndexReq.leaderAlias, replicateIndexReq.leaderIndex)
if (state.routingTable.hasIndex(replicateIndexReq.followerIndex)) {
throw IllegalArgumentException("Cant use same index again for replication. " +
"Delete the index:${replicateIndexReq.followerIndex}")
}
indexScopedSettings.validate(replicateIndexReq.settings,
false,
false)
val params = IndexReplicationParams(replicateIndexReq.leaderAlias, remoteMetadata.index, replicateIndexReq.followerIndex)
replicationMetadataManager.addIndexReplicationMetadata(replicateIndexReq.followerIndex,
replicateIndexReq.leaderAlias, replicateIndexReq.leaderIndex,
ReplicationOverallState.RUNNING, user, replicateIndexReq.useRoles?.getOrDefault(ReplicateIndexRequest.FOLLOWER_CLUSTER_ROLE, null),
replicateIndexReq.useRoles?.getOrDefault(ReplicateIndexRequest.LEADER_CLUSTER_ROLE, null), replicateIndexReq.settings)
val task = persistentTasksService.startTask("replication:index:${replicateIndexReq.followerIndex}",
IndexReplicationExecutor.TASK_NAME, params)
if (!task.isAssigned) {
log.error("Failed to assign task")
listener.onResponse(ReplicateIndexResponse(false))
}
// Now wait for the replication to start and the follower index to get created before returning
persistentTasksService.waitForTaskCondition(task.id, replicateIndexReq.timeout()) { t ->
val replicationState = (t.state as IndexReplicationState?)?.state
replicationState == ReplicationState.FOLLOWING ||
(!replicateIndexReq.waitForRestore && replicationState == ReplicationState.RESTORING)
}
listener.onResponse(AcknowledgedResponse(true))
} catch (e: Exception) {
log.error("Failed to trigger replication for ${replicateIndexReq.followerIndex} - ${e.stackTraceToString()}")
listener.onFailure(e)
}
}
}