suspend fun restoreShardUsingMultiChunkTransfer()

in src/main/kotlin/org/opensearch/replication/repository/RemoteClusterRepository.kt [266:337]


    suspend fun restoreShardUsingMultiChunkTransfer(store: Store, snapshotId: SnapshotId, indexId: IndexId,
                                                    snapshotShardId: ShardId,
                                                    recoveryState: RecoveryState, listener: ActionListener<Void>) {

        var multiChunkTransfer: RemoteClusterMultiChunkTransfer?
        var restoreUUID: String?
        var leaderShardNode: DiscoveryNode?
        var leaderShardId: ShardId?
        val followerIndexName = store.shardId().indexName
        val followerShardId = store.shardId()
        // 1. Get all the files info from the leader cluster for this shardId
        // Node containing the shard
        val leaderClusterState = getLeaderClusterState(true, true, indexId.name)
        val leaderShardRouting = leaderClusterState.routingTable.shardRoutingTable(snapshotShardId.indexName,
                snapshotShardId.id).primaryShard()
        leaderShardNode = leaderClusterState.nodes.get(leaderShardRouting.currentNodeId())
        // Get the index UUID of the leader cluster for the metadata request
        leaderShardId = ShardId(snapshotShardId.indexName,
                leaderClusterState.metadata.index(indexId.name).indexUUID,
                snapshotShardId.id)
        restoreUUID = UUIDs.randomBase64UUID()
        val getStoreMetadataRequest = GetStoreMetadataRequest(restoreUUID, leaderShardNode, leaderShardId,
                clusterService.clusterName.value(), followerShardId)

        // Gets the remote store metadata
        val metadataResponse = executeActionOnRemote(GetStoreMetadataAction.INSTANCE, getStoreMetadataRequest, followerIndexName)
        val metadataSnapshot = metadataResponse.metadataSnapshot

        val replMetadata = getReplicationMetadata(followerIndexName)
        // 2. Request for individual files from leader cluster for this shardId
        // make sure the store is not released until we are done.
        val fileMetadata = ArrayList(metadataSnapshot.asMap().values)
        multiChunkTransfer = RemoteClusterMultiChunkTransfer(log, clusterService.clusterName.value(), client.threadPool().threadContext,
                store, replicationSettings.concurrentFileChunks, restoreUUID, replMetadata, leaderShardNode,
                leaderShardId, fileMetadata, leaderClusterClient, recoveryState, replicationSettings.chunkSize,
                object : ActionListener<Void> {
                    override fun onFailure(e: java.lang.Exception?) {
                        log.error("Restore of ${store.shardId()} failed due to ${e?.stackTraceToString()}")
                        if (e is NodeDisconnectedException || e is NodeNotConnectedException || e is ConnectTransportException) {
                            log.info("Retrying restore shard for ${store.shardId()}")
                            Thread.sleep(1000) // to get updated leader cluster state
                            launch(Dispatchers.IO + leaderClusterClient.threadPool().coroutineContext()) {
                                restoreShardWithRetries(store, snapshotId, indexId, snapshotShardId,
                                        recoveryState, listener, ::restoreShardUsingMultiChunkTransfer, log = log)
                            }
                        } else {
                            log.error("Not retrying restore shard for ${store.shardId()}")
                            store.decRef()
                            releaseLeaderResources(restoreUUID, leaderShardNode, leaderShardId, followerShardId, followerIndexName)
                            listener.onFailure(e)
                        }

                    }

                    override fun onResponse(response: Void?) {
                        log.info("Restore successful for ${store.shardId()}")
                        store.decRef()
                        releaseLeaderResources(restoreUUID, leaderShardNode, leaderShardId, followerShardId, followerIndexName)
                        listener.onResponse(null)
                    }
                })
        if (fileMetadata.isEmpty()) {
            log.info("Initializing with empty store for shard:" + snapshotShardId.id)
            store.createEmpty(store.indexSettings().indexVersionCreated.luceneVersion)
            store.decRef()
            releaseLeaderResources(restoreUUID, leaderShardNode, leaderShardId, followerShardId, followerIndexName)
            listener.onResponse(null)
        } else {
            val replMetadata = getReplicationMetadata(followerIndexName)
            multiChunkTransfer.start()
        }
    }