in connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraJoinRDD.scala [138:179]
private[rdd] def fetchIterator(
session: CqlSession,
bsb: BoundStatementBuilder[L],
rowMetadata: CassandraRowMetadata,
leftIterator: Iterator[L],
metricsUpdater: InputMetricsUpdater
): Iterator[(L, R)] = {
val queryExecutor = QueryExecutor(session, readConf.parallelismLevel, None, None)
def pairWithRight(left: L): SettableFuture[Iterator[(L, R)]] = {
val resultFuture = SettableFuture.create[Iterator[(L, R)]]
val leftSide = Iterator.continually(left)
import com.datastax.spark.connector.util.Threads.BlockingIOExecutionContext
val stmt = bsb.bind(left)
.update(_.setPageSize(readConf.fetchSizeInRows))
.executeAs(readConf.executeAs)
queryExecutor.executeAsync(stmt).onComplete {
case Success(rs) =>
val resultSet = new PrefetchingResultSetIterator(rs)
val iteratorWithMetrics = resultSet.map(metricsUpdater.updateMetrics)
/* This is a much less than ideal place to actually rate limit, we are buffering
these futures this means we will most likely exceed our threshold*/
val throttledIterator = iteratorWithMetrics.map(JoinHelper.maybeRateLimit(readConf))
val rightSide = throttledIterator.map(rowReader.read(_, rowMetadata))
resultFuture.set(leftSide.zip(rightSide))
case Failure(throwable) =>
resultFuture.setException(throwable)
}
resultFuture
}
val queryFutures = leftIterator.map(left => {
requestsPerSecondRateLimiter.maybeSleep(1)
pairWithRight(left)
})
JoinHelper.slidingPrefetchIterator(queryFutures, readConf.parallelismLevel).flatten
}