in src/main/scala/org/apache/spark/shuffle/RssShuffleManager.scala [376:424]
private def getRssServers(numMaps: Int, numPartitions: Int, excludeHosts: Seq[String]): RssServerSelectionResult = {
val maxServerCount = conf.get(RssOpts.maxServerCount)
val minServerCount = conf.get(RssOpts.minServerCount)
var selectedServerCount = maxServerCount
val shuffleServerRatio = conf.get(RssOpts.serverRatio)
val serverCountEstimate = Math.ceil(Math.max(numMaps, numPartitions).doubleValue()/shuffleServerRatio).intValue()
if (selectedServerCount > serverCountEstimate) {
selectedServerCount = serverCountEstimate
}
if (selectedServerCount > numPartitions) {
selectedServerCount = numPartitions
}
if (selectedServerCount <= 0) {
selectedServerCount = 1
}
val rssReplicas = conf.get(RssOpts.replicas)
selectedServerCount = selectedServerCount * rssReplicas
if (selectedServerCount < minServerCount) {
selectedServerCount = minServerCount
}
val excludeHostsJavaCollection = JavaConverters.asJavaCollectionConverter(excludeHosts).asJavaCollection
val servers = executeWithServiceRegistry(serviceRegistry =>
ServiceRegistryUtils.getReachableServers(serviceRegistry, selectedServerCount, networkTimeoutMillis, dataCenter, cluster, excludeHostsJavaCollection))
if (servers.isEmpty) {
throw new RssNoServerAvailableException("There is no reachable RSS server")
}
MultiServerHeartbeatClient.getInstance().addServers(servers)
val serverArray = servers.toArray(new Array[ServerDetail](0))
var partitionFanout = 1
if (minServerCount > 1) {
// if min server count is configured, try to distribute a single partition on multiple servers
val numReplicationGroups = serverArray.length / rssReplicas
if (numReplicationGroups > numPartitions) {
partitionFanout = numReplicationGroups / numPartitions
}
}
RssServerSelectionResult(serverArray, rssReplicas, partitionFanout)
}