in connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraLeftJoinRDD.scala [164:206]
private[rdd] def fetchIterator(
session: CqlSession,
bsb: BoundStatementBuilder[L],
rowMetadata: CassandraRowMetadata,
leftIterator: Iterator[L],
metricsUpdater: InputMetricsUpdater
): Iterator[(L, Option[R])] = {
import com.datastax.spark.connector.util.Threads.BlockingIOExecutionContext
val queryExecutor = QueryExecutor(session, readConf.parallelismLevel, None, None)
def pairWithRight(left: L): SettableFuture[Iterator[(L, Option[R])]] = {
val resultFuture = SettableFuture.create[Iterator[(L, Option[R])]]
val leftSide = Iterator.continually(left)
val stmt = bsb.bind(left)
.update(_.setPageSize(readConf.fetchSizeInRows))
.executeAs(readConf.executeAs)
queryExecutor.executeAsync(stmt).onComplete {
case Success(rs) =>
val resultSet = new PrefetchingResultSetIterator(rs)
val iteratorWithMetrics = resultSet.map(metricsUpdater.updateMetrics)
/* This is a much less than ideal place to actually rate limit, we are buffering
these futures this means we will most likely exceed our threshold*/
val throttledIterator = iteratorWithMetrics.map(maybeRateLimit)
val rightSide = resultSet.isEmpty match {
case true => Iterator.single(None)
case false => throttledIterator.map(r => Some(rowReader.read(r, rowMetadata)))
}
resultFuture.set(leftSide.zip(rightSide))
case Failure(throwable) =>
resultFuture.setException(throwable)
}
resultFuture
}
val queryFutures = leftIterator.map(left => {
requestsPerSecondRateLimiter.maybeSleep(1)
pairWithRight(left)
})
JoinHelper.slidingPrefetchIterator(queryFutures, readConf.parallelismLevel).flatten
}