private def rangeToCql()

in connector/src/main/scala/com/datastax/spark/connector/rdd/partitioner/CassandraPartitionGenerator.scala [95:141]


  private def rangeToCql(range: TokenRange): Seq[CqlTokenRange[V, T]] =
    range.unwrap.map(CqlTokenRange(_))

  def partitions: Seq[CassandraPartition[V, T]] = {
    val hostAddresses = new NodeAddresses(connector)
    val tokenRanges = describeRing
    val endpointCount = tokenRanges.map(_.replicas).reduce(_ ++ _).size
    if (endpointCount == 0)
      throw new IllegalArgumentException(s"Could not retrieve endpoints for the given table " +
        s"(${keyspaceName}.${tableDef.name}), are you trying to read a table view? Table views are not supported, " +
        s"see SPARKC-612.")
    val maxGroupSize = tokenRanges.size / endpointCount

    val splitter = createTokenRangeSplitter
    val splits = splitter.split(tokenRanges, splitCount).toSeq

    val clusterer = new TokenRangeClusterer[V, T](splitCount, maxGroupSize)
    val tokenRangeGroups = clusterer.group(splits).toArray

    val partitions = for (group <- tokenRangeGroups) yield {
      val replicas = group.map(_.replicas).reduce(_ intersect _)
      val rowCount = group.map(_.rangeSize).sum
      val cqlRanges = group.flatMap(rangeToCql)
      // partition index will be set later
      CassandraPartition(0, replicas.flatMap(hostAddresses.hostNames).toArray, cqlRanges, rowCount.toLong)
    }

    // sort partitions and assign sequential numbers so that
    // partition index matches the order of partitions in the sequence

    // group partitions with the same endpoints.
    // Less endpoints partition has -- harder to find local executor for it
    // sort by endpoints size to  distribute partition with less endpoints first.
    // sort partition by size inside each group to start proccessing with big partitions
    val partitionsGroupedByEndpoints = partitions.groupBy(_.endpoints).toSeq.sortBy(_._1.size).map(_._2.sortBy(-_.dataSize))
    // merge all groups to distribute load eventually for all endpoints.
    // Loop over groups and get next element of each
    // emmit them in order. So we get one partition for different endpoints.
    val groupIterators = partitionsGroupedByEndpoints.map(_.iterator)
    val roundRobinSelectedPartitions = Iterator.continually { groupIterators.filter(_.hasNext).map(_.next) }
      .takeWhile(_.nonEmpty).flatten.toSeq

    val indexedPartitions = roundRobinSelectedPartitions
      .zipWithIndex
      .map { case (p, index) => p.copy(index = index) }
    indexedPartitions
  }