in connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala [296:333]
override def getPreferredLocations(split: Partition): Seq[String] =
split.asInstanceOf[CassandraPartition[_, _]].endpoints
override def compute(split: Partition, context: TaskContext): Iterator[R] = {
val partition = split.asInstanceOf[CassandraPartition[Any, _ <: ConnectorToken[Any]]]
val tokenRanges = partition.tokenRanges
val metricsUpdater = InputMetricsUpdater(context, readConf)
val columnNames = selectedColumnRefs.map(_.selectedAs).toIndexedSeq
val scanner = connector.connectionFactory.getScanner(readConf, connector.conf, columnNames)
val queryParts = CqlQueryParts(selectedColumnRefs, where, limit, clusteringOrder)
// Iterator flatMap trick flattens the iterator-of-iterator structure into a single iterator.
// flatMap on iterator is lazy, therefore a query for the next token range is executed not earlier
// than all of the rows returned by the previous query have been consumed
val rowIterator = tokenRanges.iterator.flatMap { range =>
try {
val scanResult = ScanHelper.fetchTokenRange(scanner, tableDef, queryParts, range, consistencyLevel, fetchSize)
val iteratorWithMetrics = scanResult.rows.map(metricsUpdater.updateMetrics)
val result = iteratorWithMetrics.map(rowReader.read(_, scanResult.metadata))
result
} catch {
case t: Throwable =>
throw new IOException(s"Exception during scan execution for $range : ${t.getMessage}", t)
}
}
val countingIterator = new CountingIterator(rowIterator, limitForIterator(limit))
context.addTaskCompletionListener { context =>
val duration = metricsUpdater.finish() / 1000000000d
logDebug(f"Fetched ${countingIterator.count} rows from $keyspaceName.$tableName " +
f"for partition ${partition.index} in $duration%.3f s.")
scanner.close()
context
}
countingIterator
}