in src/main/kotlin/org/opensearch/replication/action/stop/TransportStopIndexReplicationAction.kt [88:150]
override fun masterOperation(request: StopIndexReplicationRequest, state: ClusterState,
listener: ActionListener<AcknowledgedResponse>) {
launch(Dispatchers.Unconfined + threadPool.coroutineContext()) {
try {
log.info("Stopping index replication on index:" + request.indexName)
// NOTE: We remove the block first before validation since it is harmless idempotent operations and
// gives back control of the index even if any failure happens in one of the steps post this.
val updateIndexBlockRequest = UpdateIndexBlockRequest(request.indexName,IndexBlockUpdateType.REMOVE_BLOCK)
val updateIndexBlockResponse = client.suspendExecute(UpdateIndexBlockAction.INSTANCE, updateIndexBlockRequest, injectSecurityContext = true)
if(!updateIndexBlockResponse.isAcknowledged) {
throw OpenSearchException("Failed to remove index block on ${request.indexName}")
}
validateStopReplicationRequest(request)
// Index will be deleted if replication is stopped while it is restoring. So no need to close/reopen
val restoring = clusterService.state().custom<RestoreInProgress>(RestoreInProgress.TYPE, RestoreInProgress.EMPTY).any { entry ->
entry.indices().any { it == request.indexName }
}
if(restoring) {
log.info("Index[${request.indexName}] is in restoring stage")
}
if (!restoring &&
state.routingTable.hasIndex(request.indexName)) {
var updateRequest = UpdateMetadataRequest(request.indexName, UpdateMetadataRequest.Type.CLOSE, Requests.closeIndexRequest(request.indexName))
var closeResponse = client.suspendExecute(UpdateMetadataAction.INSTANCE, updateRequest, injectSecurityContext = true)
if (!closeResponse.isAcknowledged) {
throw OpenSearchException("Unable to close index: ${request.indexName}")
}
}
val replMetadata = replicationMetadataManager.getIndexReplicationMetadata(request.indexName)
try {
val remoteClient = client.getRemoteClusterClient(replMetadata.connectionName)
val retentionLeaseHelper = RemoteClusterRetentionLeaseHelper(clusterService.clusterName.value(), remoteClient)
retentionLeaseHelper.attemptRemoveRetentionLease(clusterService, replMetadata, request.indexName)
} catch(e: Exception) {
log.error("Failed to remove retention lease from the leader cluster", e)
}
val clusterStateUpdateResponse : AcknowledgedResponse =
clusterService.waitForClusterStateUpdate("stop_replication") { l -> StopReplicationTask(request, l)}
if (!clusterStateUpdateResponse.isAcknowledged) {
throw OpenSearchException("Failed to update cluster state")
}
// Index will be deleted if stop is called while it is restoring. So no need to reopen
if (!restoring &&
state.routingTable.hasIndex(request.indexName)) {
val reopenResponse = client.suspending(client.admin().indices()::open, injectSecurityContext = true)(OpenIndexRequest(request.indexName))
if (!reopenResponse.isAcknowledged) {
throw OpenSearchException("Failed to reopen index: ${request.indexName}")
}
}
replicationMetadataManager.deleteIndexReplicationMetadata(request.indexName)
listener.onResponse(AcknowledgedResponse(true))
} catch (e: Exception) {
log.error("Stop replication failed for index[${request.indexName}] with error ${e.stackTraceToString()}")
listener.onFailure(e)
}
}
}