private def createBlockDownloaderPartitionRecordIteratorWithoutRetry()

in src/main/scala/org/apache/spark/shuffle/rss/BlockDownloaderPartitionRangeRecordIterator.scala [95:163]


  private def createBlockDownloaderPartitionRecordIteratorWithoutRetry(partition: Int): Iterator[Product2[K, C]] = {
    var downloader: ShuffleDataReader = null
    try {
      val mapOutputRssInfo = getPartitionRssInfo(partition)

      if (shuffleReplicas >= 1) {
        val serverReplicationGroups = RssUtils.getRssServerReplicationGroups(rssServers, shuffleReplicas, partition, partitionFanout)
        logInfo(s"Creating replicated read client for partition $partition, $serverReplicationGroups")
        val appShufflePartitionId = new AppShufflePartitionId(appId, appAttempt, shuffleId, partition)

        val serverConnectionResolver = new ServerConnectionStringResolver {
          override def resolveConnection(serverId: String): ServerDetail = {
            // random sleep some time to avoid request spike on service registry
            val random = new Random()
            val randomWaitMillis = random.nextInt(dataAvailablePollInterval.intValue())
            ThreadUtils.sleep(randomWaitMillis)
            val lookupResult = RssServiceRegistry.executeWithServiceRegistry(serviceRegistry =>
              serviceRegistry.lookupServers(serviceRegistryDataCenter, serviceRegistryCluster, util.Arrays.asList(serverId)))
            if (lookupResult == null) {
              throw new RssServerResolveException(s"Got null when looking up server for $serverId")
            }
            // close service registry
            if (lookupResult.size() != 1) {
              throw new RssInvalidStateException(s"Invalid result $lookupResult when looking up server for $serverId")
            }
            lookupResult.get(0)
          }
        }

        val serverConnectionRefresher = new ServerConnectionCacheUpdateRefresher(serverConnectionResolver, ServerConnectionStringCache.getInstance());
        val client = new MultiServerSocketReadClient(
          serverReplicationGroups,
          timeoutMillis,
          new ClientRetryOptions(dataAvailablePollInterval, maxRetryMillis, serverConnectionRefresher),
          user,
          appShufflePartitionId,
          new ReadClientDataOptions(JavaConverters.asJavaCollectionConverter(mapOutputRssInfo.taskAttemptIds.map(long2Long)).asJavaCollection,
            dataAvailablePollInterval,
            dataAvailableWaitTime),
          checkShuffleReplicaConsistency
        )
        client.connect()
        downloader = client
        new BlockDownloaderPartitionRecordIterator(
          shuffleId,
          partition,
          serializer,
          decompression,
          downloader,
          shuffleReadMetrics)
      } else {
        throw new RssException(s"Invalid shuffle replicas: $shuffleReplicas")
      }
    } catch {
      case ex: Throwable => {
        if (downloader != null) {
          downloader.close()
        }
        M3Stats.addException(ex, this.getClass().getSimpleName())
        throw new FetchFailedException(
          RssUtils.createReduceTaskDummyBlockManagerId(shuffleId, partition),
          shuffleId,
          -1,
          partition,
          s"Cannot fetch shuffle $shuffleId partition $partition due to ${ExceptionUtils.getSimpleMessage(ex)})",
          ex)
      }
    }
  }