def getRssInfoFromMapOutputTracker()

in src/main/scala/org/apache/spark/shuffle/rss/RssUtils.scala [83:123]


  def getRssInfoFromMapOutputTracker(shuffleId: Int, partition: Int, retryIntervalMillis: Long, maxRetryMillis: Long): MapOutputRssInfo = {
    // this hash map stores rss servers for each map task's latest attempt
    val mapLatestAttemptRssServers = scala.collection.mutable.HashMap[Int, MapTaskRssInfo]()
    val mapAttemptRssInfoList =
      RetryUtils.retry(retryIntervalMillis,
        retryIntervalMillis * 10,
        maxRetryMillis,
        s"get information from map output tracker, shuffleId: $shuffleId, partition: $partition",
        new Supplier[Seq[MapTaskRssInfo]] {
          override def get(): Seq[MapTaskRssInfo] = {
            val mapStatusInfo = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(shuffleId, partition, partition + 1)
            logInfo(s"Got result from mapOutputTracker.getMapSizesByExecutorId")
            mapStatusInfo.toParArray.flatMap(mapStatusInfoEntry=>RssUtils.getRssInfoFromBlockManagerId(mapStatusInfoEntry._1)).toList
          }
        })
    logInfo(s"Got ${mapAttemptRssInfoList.size} items after parsing mapOutputTracker.getMapSizesByExecutorId result")
    if (mapAttemptRssInfoList.isEmpty) {
      throw new RssInvalidMapStatusException(s"Failed to get information from map output tracker, shuffleId: $shuffleId, partition: $partition")
    }
    for (mapAttemptRssInfo <- mapAttemptRssInfoList) {
      val mapId = mapAttemptRssInfo.getMapId
      val oldValue = mapLatestAttemptRssServers.get(mapId)
      if (oldValue.isEmpty || oldValue.get.getTaskAttemptId < mapAttemptRssInfo.getTaskAttemptId) {
        mapLatestAttemptRssServers.put(mapId, mapAttemptRssInfo)
      }
    }
    val numMaps = mapLatestAttemptRssServers.size
    val numRssServersValues = mapLatestAttemptRssServers.values
      .map(_.getNumRssServers)
      .toList
      .distinct
    if (numRssServersValues.size != 1) {
      throw new RssInvalidMapStatusException(s"Got invalid number of RSS servers: $numRssServersValues")
    }
    val numRssServers = numRssServersValues.head
    val mapTaskAttemptIds = mapLatestAttemptRssServers.values
      .map(_.getTaskAttemptId)
      .toArray
      .distinct
    MapOutputRssInfo(numMaps, numRssServers, mapTaskAttemptIds)
  }