in src/main/scala/org/apache/spark/shuffle/rss/BlockDownloaderPartitionRangeRecordIterator.scala [95:163]
private def createBlockDownloaderPartitionRecordIteratorWithoutRetry(partition: Int): Iterator[Product2[K, C]] = {
var downloader: ShuffleDataReader = null
try {
val mapOutputRssInfo = getPartitionRssInfo(partition)
if (shuffleReplicas >= 1) {
val serverReplicationGroups = RssUtils.getRssServerReplicationGroups(rssServers, shuffleReplicas, partition, partitionFanout)
logInfo(s"Creating replicated read client for partition $partition, $serverReplicationGroups")
val appShufflePartitionId = new AppShufflePartitionId(appId, appAttempt, shuffleId, partition)
val serverConnectionResolver = new ServerConnectionStringResolver {
override def resolveConnection(serverId: String): ServerDetail = {
// random sleep some time to avoid request spike on service registry
val random = new Random()
val randomWaitMillis = random.nextInt(dataAvailablePollInterval.intValue())
ThreadUtils.sleep(randomWaitMillis)
val lookupResult = RssServiceRegistry.executeWithServiceRegistry(serviceRegistry =>
serviceRegistry.lookupServers(serviceRegistryDataCenter, serviceRegistryCluster, util.Arrays.asList(serverId)))
if (lookupResult == null) {
throw new RssServerResolveException(s"Got null when looking up server for $serverId")
}
// close service registry
if (lookupResult.size() != 1) {
throw new RssInvalidStateException(s"Invalid result $lookupResult when looking up server for $serverId")
}
lookupResult.get(0)
}
}
val serverConnectionRefresher = new ServerConnectionCacheUpdateRefresher(serverConnectionResolver, ServerConnectionStringCache.getInstance());
val client = new MultiServerSocketReadClient(
serverReplicationGroups,
timeoutMillis,
new ClientRetryOptions(dataAvailablePollInterval, maxRetryMillis, serverConnectionRefresher),
user,
appShufflePartitionId,
new ReadClientDataOptions(JavaConverters.asJavaCollectionConverter(mapOutputRssInfo.taskAttemptIds.map(long2Long)).asJavaCollection,
dataAvailablePollInterval,
dataAvailableWaitTime),
checkShuffleReplicaConsistency
)
client.connect()
downloader = client
new BlockDownloaderPartitionRecordIterator(
shuffleId,
partition,
serializer,
decompression,
downloader,
shuffleReadMetrics)
} else {
throw new RssException(s"Invalid shuffle replicas: $shuffleReplicas")
}
} catch {
case ex: Throwable => {
if (downloader != null) {
downloader.close()
}
M3Stats.addException(ex, this.getClass().getSimpleName())
throw new FetchFailedException(
RssUtils.createReduceTaskDummyBlockManagerId(shuffleId, partition),
shuffleId,
-1,
partition,
s"Cannot fetch shuffle $shuffleId partition $partition due to ${ExceptionUtils.getSimpleMessage(ex)})",
ex)
}
}
}