in src/main/scala/org/apache/spark/shuffle/RssShuffleManager.scala [315:357]
override def getReader[K, C](handle: ShuffleHandle, startPartition: Int, endPartition: Int, context: TaskContext): ShuffleReader[K, C] = {
logInfo(s"getReader: Use ShuffleManager: ${this.getClass().getSimpleName()}, $handle, partitions: [$startPartition, $endPartition)")
val rssShuffleHandle = handle.asInstanceOf[RssShuffleHandle[K, _, C]]
val shuffleInfo = new AppShuffleId(
conf.getAppId,
rssShuffleHandle.appAttempt,
handle.shuffleId
)
if (rssShuffleHandle.numMaps == 0) {
return new RssEmptyShuffleReader(
shuffleInfo,
startPartition,
endPartition)
}
val serializer = rssShuffleHandle.dependency.serializer
val rssReplicas = conf.get(RssOpts.replicas)
val rssCheckReplicaConsistency = conf.get(RssOpts.checkReplicaConsistency)
val maxWaitMillis = conf.get(RssOpts.maxWaitTime)
val rssServers = ServerConnectionStringCache.getInstance().getServerList(rssShuffleHandle.getServerList)
new RssShuffleReader(
user = rssShuffleHandle.user,
shuffleInfo = shuffleInfo,
startPartition = startPartition,
endPartition = endPartition,
serializer = serializer,
decompression = conf.get(RssOpts.compression),
context = context,
shuffleDependency = rssShuffleHandle.dependency,
numMaps = rssShuffleHandle.numMaps,
rssServers = rssServers,
partitionFanout = rssShuffleHandle.partitionFanout,
serviceRegistryDataCenter = dataCenter,
serviceRegistryCluster = cluster,
timeoutMillis = networkTimeoutMillis,
maxRetryMillis = maxWaitMillis.toInt,
dataAvailablePollInterval = pollInterval,
dataAvailableWaitTime = dataAvailableWaitTime,
shuffleReplicas = rssReplicas,
checkShuffleReplicaConsistency = rssCheckReplicaConsistency)
}