in connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraMergeJoinRDD.scala [187:246]
override def compute(split: Partition, context: TaskContext): Iterator[(Seq[L], Seq[R])] = {
/** Open two sessions if Cluster Configurations are different **/
def openSessions(): (CqlSession, CqlSession) = {
if (leftScanRDD.connector == rightScanRDD.connector) {
val session = leftScanRDD.connector.openSession()
(session, session)
} else {
(leftScanRDD.connector.openSession, rightScanRDD.connector.openSession)
}
}
def closeSessions(leftSession: CqlSession, rightSession : CqlSession): Unit = {
if (leftSession != rightSession) rightSession.close()
leftSession.close()
}
val (leftSession, rightSession) = openSessions()
type V = t forSome { type t }
type T = t forSome { type t <: com.datastax.spark.connector.rdd.partitioner.dht.Token[V] }
val partition = split.asInstanceOf[CassandraPartition[V, T]]
val tokenRanges = partition.tokenRanges
val metricsReadConf = new ReadConf(taskMetricsEnabled =
leftScanRDD.readConf.taskMetricsEnabled || rightScanRDD.readConf.taskMetricsEnabled)
val metricsUpdater = InputMetricsUpdater(context, metricsReadConf)
val mergingIterator = tokenRanges.iterator.flatMap { tokenRange =>
val (leftMetadata, leftRowIterator) = fetchTokenRange(leftSession, leftScanRDD, tokenRange, metricsUpdater)
val (rightMetadata, rightRowIterator) = fetchTokenRange(rightSession, rightScanRDD, tokenRange, metricsUpdater)
val rowMerger = new MergeJoinIterator[Row, Row, Token](
leftRowIterator,
rightRowIterator,
tokenExtractor,
tokenExtractor
)
rowMerger.map { case (t: Token, lRows : Seq[Row], rRows: Seq[Row]) => (
t,
convertRowIterator(lRows.iterator, leftScanRDD.rowReader, leftMetadata).toList,
convertRowIterator(rRows.iterator, rightScanRDD.rowReader, rightMetadata).toList)
}
}
val countingIterator = new CountingIterator(mergingIterator)
context.addTaskCompletionListener { (context) =>
val duration = metricsUpdater.finish() / 1000000000d
logDebug(
f"""Fetched ${countingIterator.count} rows from
|${leftScanRDD.keyspaceName} ${leftScanRDD.tableName} and ${rightScanRDD.tableName}
|for partition ${partition.index} in $duration%.3f s.""".stripMargin)
closeSessions(leftSession, rightSession)
context
}
val iteratorWithoutToken = countingIterator.map(tuple => (tuple._2, tuple._3))
iteratorWithoutToken
}