private[rdd] def fetchIterator()

in connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraLeftJoinRDD.scala [164:206]


  private[rdd] def fetchIterator(
    session: CqlSession,
    bsb: BoundStatementBuilder[L],
    rowMetadata: CassandraRowMetadata,
    leftIterator: Iterator[L],
    metricsUpdater: InputMetricsUpdater
  ): Iterator[(L, Option[R])] = {
    import com.datastax.spark.connector.util.Threads.BlockingIOExecutionContext

    val queryExecutor = QueryExecutor(session, readConf.parallelismLevel, None, None)

    def pairWithRight(left: L): SettableFuture[Iterator[(L, Option[R])]] = {
      val resultFuture = SettableFuture.create[Iterator[(L, Option[R])]]
      val leftSide = Iterator.continually(left)

      val stmt = bsb.bind(left)
        .update(_.setPageSize(readConf.fetchSizeInRows))
        .executeAs(readConf.executeAs)
      queryExecutor.executeAsync(stmt).onComplete {
        case Success(rs) =>
          val resultSet = new PrefetchingResultSetIterator(rs)
          val iteratorWithMetrics = resultSet.map(metricsUpdater.updateMetrics)
          /* This is a much less than ideal place to actually rate limit, we are buffering
          these futures this means we will most likely exceed our threshold*/
          val throttledIterator = iteratorWithMetrics.map(maybeRateLimit)
          val rightSide = resultSet.isEmpty match {
            case true => Iterator.single(None)
            case false => throttledIterator.map(r => Some(rowReader.read(r, rowMetadata)))
          }
          resultFuture.set(leftSide.zip(rightSide))
        case Failure(throwable) =>
          resultFuture.setException(throwable)
      }

      resultFuture
    }

    val queryFutures = leftIterator.map(left => {
      requestsPerSecondRateLimiter.maybeSleep(1)
      pairWithRight(left)
    })
    JoinHelper.slidingPrefetchIterator(queryFutures, readConf.parallelismLevel).flatten
  }