in connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/CassandraPartitionGenerator.scala [95:141]
private def rangeToCql(range: TokenRange): Seq[CqlTokenRange[V, T]] =
range.unwrap.map(CqlTokenRange(_))
def partitions: Seq[CassandraPartition[V, T]] = {
val hostAddresses = new NodeAddresses(connector)
val tokenRanges = describeRing
val endpointCount = tokenRanges.map(_.replicas).reduce(_ ++ _).size
if (endpointCount == 0)
throw new IllegalArgumentException(s"Could not retrieve endpoints for the given table " +
s"(${keyspaceName}.${tableDef.name}), are you trying to read a table view? Table views are not supported, " +
s"see SPARKC-612.")
val maxGroupSize = tokenRanges.size / endpointCount
val splitter = createTokenRangeSplitter
val splits = splitter.split(tokenRanges, splitCount).toSeq
val clusterer = new TokenRangeClusterer[V, T](splitCount, maxGroupSize)
val tokenRangeGroups = clusterer.group(splits).toArray
val partitions = for (group <- tokenRangeGroups) yield {
val replicas = group.map(_.replicas).reduce(_ intersect _)
val rowCount = group.map(_.rangeSize).sum
val cqlRanges = group.flatMap(rangeToCql)
// partition index will be set later
CassandraPartition(0, replicas.flatMap(hostAddresses.hostNames).toArray, cqlRanges, rowCount.toLong)
}
// sort partitions and assign sequential numbers so that
// partition index matches the order of partitions in the sequence
// group partitions with the same endpoints.
// Less endpoints partition has -- harder to find local executor for it
// sort by endpoints size to distribute partition with less endpoints first.
// sort partition by size inside each group to start proccessing with big partitions
val partitionsGroupedByEndpoints = partitions.groupBy(_.endpoints).toSeq.sortBy(_._1.size).map(_._2.sortBy(-_.dataSize))
// merge all groups to distribute load eventually for all endpoints.
// Loop over groups and get next element of each
// emmit them in order. So we get one partition for different endpoints.
val groupIterators = partitionsGroupedByEndpoints.map(_.iterator)
val roundRobinSelectedPartitions = Iterator.continually { groupIterators.filter(_.hasNext).map(_.next) }
.takeWhile(_.nonEmpty).flatten.toSeq
val indexedPartitions = roundRobinSelectedPartitions
.zipWithIndex
.map { case (p, index) => p.copy(index = index) }
indexedPartitions
}