in src/main/kotlin/org/opensearch/replication/repository/RemoteClusterRepository.kt [266:337]
suspend fun restoreShardUsingMultiChunkTransfer(store: Store, snapshotId: SnapshotId, indexId: IndexId,
snapshotShardId: ShardId,
recoveryState: RecoveryState, listener: ActionListener<Void>) {
var multiChunkTransfer: RemoteClusterMultiChunkTransfer?
var restoreUUID: String?
var leaderShardNode: DiscoveryNode?
var leaderShardId: ShardId?
val followerIndexName = store.shardId().indexName
val followerShardId = store.shardId()
// 1. Get all the files info from the leader cluster for this shardId
// Node containing the shard
val leaderClusterState = getLeaderClusterState(true, true, indexId.name)
val leaderShardRouting = leaderClusterState.routingTable.shardRoutingTable(snapshotShardId.indexName,
snapshotShardId.id).primaryShard()
leaderShardNode = leaderClusterState.nodes.get(leaderShardRouting.currentNodeId())
// Get the index UUID of the leader cluster for the metadata request
leaderShardId = ShardId(snapshotShardId.indexName,
leaderClusterState.metadata.index(indexId.name).indexUUID,
snapshotShardId.id)
restoreUUID = UUIDs.randomBase64UUID()
val getStoreMetadataRequest = GetStoreMetadataRequest(restoreUUID, leaderShardNode, leaderShardId,
clusterService.clusterName.value(), followerShardId)
// Gets the remote store metadata
val metadataResponse = executeActionOnRemote(GetStoreMetadataAction.INSTANCE, getStoreMetadataRequest, followerIndexName)
val metadataSnapshot = metadataResponse.metadataSnapshot
val replMetadata = getReplicationMetadata(followerIndexName)
// 2. Request for individual files from leader cluster for this shardId
// make sure the store is not released until we are done.
val fileMetadata = ArrayList(metadataSnapshot.asMap().values)
multiChunkTransfer = RemoteClusterMultiChunkTransfer(log, clusterService.clusterName.value(), client.threadPool().threadContext,
store, replicationSettings.concurrentFileChunks, restoreUUID, replMetadata, leaderShardNode,
leaderShardId, fileMetadata, leaderClusterClient, recoveryState, replicationSettings.chunkSize,
object : ActionListener<Void> {
override fun onFailure(e: java.lang.Exception?) {
log.error("Restore of ${store.shardId()} failed due to ${e?.stackTraceToString()}")
if (e is NodeDisconnectedException || e is NodeNotConnectedException || e is ConnectTransportException) {
log.info("Retrying restore shard for ${store.shardId()}")
Thread.sleep(1000) // to get updated leader cluster state
launch(Dispatchers.IO + leaderClusterClient.threadPool().coroutineContext()) {
restoreShardWithRetries(store, snapshotId, indexId, snapshotShardId,
recoveryState, listener, ::restoreShardUsingMultiChunkTransfer, log = log)
}
} else {
log.error("Not retrying restore shard for ${store.shardId()}")
store.decRef()
releaseLeaderResources(restoreUUID, leaderShardNode, leaderShardId, followerShardId, followerIndexName)
listener.onFailure(e)
}
}
override fun onResponse(response: Void?) {
log.info("Restore successful for ${store.shardId()}")
store.decRef()
releaseLeaderResources(restoreUUID, leaderShardNode, leaderShardId, followerShardId, followerIndexName)
listener.onResponse(null)
}
})
if (fileMetadata.isEmpty()) {
log.info("Initializing with empty store for shard:" + snapshotShardId.id)
store.createEmpty(store.indexSettings().indexVersionCreated.luceneVersion)
store.decRef()
releaseLeaderResources(restoreUUID, leaderShardNode, leaderShardId, followerShardId, followerIndexName)
listener.onResponse(null)
} else {
val replMetadata = getReplicationMetadata(followerIndexName)
multiChunkTransfer.start()
}
}