override def getReader[K, C]()

in src/main/scala/org/apache/spark/shuffle/RssShuffleManager.scala [315:357]


  override def getReader[K, C](handle: ShuffleHandle, startPartition: Int, endPartition: Int, context: TaskContext): ShuffleReader[K, C] = {
    logInfo(s"getReader: Use ShuffleManager: ${this.getClass().getSimpleName()}, $handle, partitions: [$startPartition, $endPartition)")

    val rssShuffleHandle = handle.asInstanceOf[RssShuffleHandle[K, _, C]]
    val shuffleInfo = new AppShuffleId(
      conf.getAppId,
      rssShuffleHandle.appAttempt,
      handle.shuffleId
    )

    if (rssShuffleHandle.numMaps == 0) {
      return new RssEmptyShuffleReader(
        shuffleInfo,
        startPartition,
        endPartition)
    }

    val serializer = rssShuffleHandle.dependency.serializer
    val rssReplicas = conf.get(RssOpts.replicas)
    val rssCheckReplicaConsistency = conf.get(RssOpts.checkReplicaConsistency)
    val maxWaitMillis = conf.get(RssOpts.maxWaitTime)
    val rssServers = ServerConnectionStringCache.getInstance().getServerList(rssShuffleHandle.getServerList)
    new RssShuffleReader(
      user = rssShuffleHandle.user,
      shuffleInfo = shuffleInfo,
      startPartition = startPartition,
      endPartition = endPartition,
      serializer = serializer,
      decompression = conf.get(RssOpts.compression),
      context = context,
      shuffleDependency = rssShuffleHandle.dependency,
      numMaps = rssShuffleHandle.numMaps,
      rssServers = rssServers,
      partitionFanout = rssShuffleHandle.partitionFanout,
      serviceRegistryDataCenter = dataCenter,
      serviceRegistryCluster = cluster,
      timeoutMillis = networkTimeoutMillis,
      maxRetryMillis = maxWaitMillis.toInt,
      dataAvailablePollInterval = pollInterval,
      dataAvailableWaitTime = dataAvailableWaitTime,
      shuffleReplicas = rssReplicas,
      checkShuffleReplicaConsistency = rssCheckReplicaConsistency)
  }