private def getRssServers()

in src/main/scala/org/apache/spark/shuffle/RssShuffleManager.scala [376:424]


  private def getRssServers(numMaps: Int, numPartitions: Int, excludeHosts: Seq[String]): RssServerSelectionResult = {
    val maxServerCount = conf.get(RssOpts.maxServerCount)
    val minServerCount = conf.get(RssOpts.minServerCount)

    var selectedServerCount = maxServerCount

    val shuffleServerRatio = conf.get(RssOpts.serverRatio)
    val serverCountEstimate = Math.ceil(Math.max(numMaps, numPartitions).doubleValue()/shuffleServerRatio).intValue()
    if (selectedServerCount > serverCountEstimate) {
      selectedServerCount = serverCountEstimate
    }

    if (selectedServerCount > numPartitions) {
      selectedServerCount = numPartitions
    }

    if (selectedServerCount <= 0) {
      selectedServerCount = 1
    }

    val rssReplicas = conf.get(RssOpts.replicas)
    selectedServerCount = selectedServerCount * rssReplicas

    if (selectedServerCount < minServerCount) {
      selectedServerCount = minServerCount
    }

    val excludeHostsJavaCollection = JavaConverters.asJavaCollectionConverter(excludeHosts).asJavaCollection
    val servers = executeWithServiceRegistry(serviceRegistry =>
      ServiceRegistryUtils.getReachableServers(serviceRegistry, selectedServerCount, networkTimeoutMillis, dataCenter, cluster, excludeHostsJavaCollection))
    if (servers.isEmpty) {
      throw new RssNoServerAvailableException("There is no reachable RSS server")
    }

    MultiServerHeartbeatClient.getInstance().addServers(servers)

    val serverArray = servers.toArray(new Array[ServerDetail](0))

    var partitionFanout = 1
    if (minServerCount > 1) {
      // if min server count is configured, try to distribute a single partition on multiple servers
      val numReplicationGroups = serverArray.length / rssReplicas
      if (numReplicationGroups > numPartitions) {
        partitionFanout = numReplicationGroups / numPartitions
      }
    }

    RssServerSelectionResult(serverArray, rssReplicas, partitionFanout)
  }