override def getPreferredLocations()

in connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala [296:333]


  override def getPreferredLocations(split: Partition): Seq[String] =
    split.asInstanceOf[CassandraPartition[_, _]].endpoints

  override def compute(split: Partition, context: TaskContext): Iterator[R] = {
    val partition = split.asInstanceOf[CassandraPartition[Any, _ <: ConnectorToken[Any]]]
    val tokenRanges = partition.tokenRanges
    val metricsUpdater = InputMetricsUpdater(context, readConf)

    val columnNames = selectedColumnRefs.map(_.selectedAs).toIndexedSeq

    val scanner = connector.connectionFactory.getScanner(readConf, connector.conf, columnNames)
    val queryParts = CqlQueryParts(selectedColumnRefs, where, limit, clusteringOrder)

    // Iterator flatMap trick flattens the iterator-of-iterator structure into a single iterator.
    // flatMap on iterator is lazy, therefore a query for the next token range is executed not earlier
    // than all of the rows returned by the previous query have been consumed
    val rowIterator = tokenRanges.iterator.flatMap { range =>
      try {
        val scanResult = ScanHelper.fetchTokenRange(scanner, tableDef, queryParts, range, consistencyLevel, fetchSize)
        val iteratorWithMetrics = scanResult.rows.map(metricsUpdater.updateMetrics)
        val result = iteratorWithMetrics.map(rowReader.read(_, scanResult.metadata))
        result
      } catch {
        case t: Throwable =>
          throw new IOException(s"Exception during scan execution for $range : ${t.getMessage}", t)
      }
    }
    val countingIterator = new CountingIterator(rowIterator, limitForIterator(limit))

    context.addTaskCompletionListener { context =>
      val duration = metricsUpdater.finish() / 1000000000d
      logDebug(f"Fetched ${countingIterator.count} rows from $keyspaceName.$tableName " +
        f"for partition ${partition.index} in $duration%.3f s.")
      scanner.close()
      context
    }
    countingIterator
  }