private[rdd] def fetchIterator()

in connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraJoinRDD.scala [138:179]


  private[rdd] def fetchIterator(
    session: CqlSession,
    bsb: BoundStatementBuilder[L],
    rowMetadata: CassandraRowMetadata,
    leftIterator: Iterator[L],
    metricsUpdater: InputMetricsUpdater
  ): Iterator[(L, R)] = {

    val queryExecutor = QueryExecutor(session, readConf.parallelismLevel, None, None)

    def pairWithRight(left: L): SettableFuture[Iterator[(L, R)]] = {
      val resultFuture = SettableFuture.create[Iterator[(L, R)]]
      val leftSide = Iterator.continually(left)

      import com.datastax.spark.connector.util.Threads.BlockingIOExecutionContext

      val stmt = bsb.bind(left)
        .update(_.setPageSize(readConf.fetchSizeInRows))
        .executeAs(readConf.executeAs)
      queryExecutor.executeAsync(stmt).onComplete {
        case Success(rs) =>
          val resultSet = new PrefetchingResultSetIterator(rs)
          val iteratorWithMetrics = resultSet.map(metricsUpdater.updateMetrics)
          /* This is a much less than ideal place to actually rate limit, we are buffering
          these futures this means we will most likely exceed our threshold*/
          val throttledIterator = iteratorWithMetrics.map(JoinHelper.maybeRateLimit(readConf))
          val rightSide = throttledIterator.map(rowReader.read(_, rowMetadata))
          resultFuture.set(leftSide.zip(rightSide))
        case Failure(throwable) =>
          resultFuture.setException(throwable)
      }

      resultFuture
    }

    val queryFutures = leftIterator.map(left => {
      requestsPerSecondRateLimiter.maybeSleep(1)
      pairWithRight(left)
    })

    JoinHelper.slidingPrefetchIterator(queryFutures, readConf.parallelismLevel).flatten
  }