override def compute()

in connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraMergeJoinRDD.scala [187:246]


  override def compute(split: Partition, context: TaskContext): Iterator[(Seq[L], Seq[R])] = {

    /** Open two sessions if Cluster Configurations are different **/
    def openSessions(): (CqlSession, CqlSession) = {
      if (leftScanRDD.connector == rightScanRDD.connector) {
        val session = leftScanRDD.connector.openSession()
        (session, session)
      } else {
        (leftScanRDD.connector.openSession, rightScanRDD.connector.openSession)
      }
    }

    def closeSessions(leftSession: CqlSession, rightSession : CqlSession): Unit = {
      if (leftSession != rightSession) rightSession.close()
      leftSession.close()
    }

    val (leftSession, rightSession) = openSessions()

    type V = t forSome { type t }
    type T = t forSome { type t <: com.datastax.spark.connector.rdd.partitioner.dht.Token[V] }
    val partition = split.asInstanceOf[CassandraPartition[V, T]]
    val tokenRanges = partition.tokenRanges

    val metricsReadConf = new ReadConf(taskMetricsEnabled =
      leftScanRDD.readConf.taskMetricsEnabled || rightScanRDD.readConf.taskMetricsEnabled)

    val metricsUpdater = InputMetricsUpdater(context, metricsReadConf)

    val mergingIterator = tokenRanges.iterator.flatMap { tokenRange =>
      val (leftMetadata, leftRowIterator) = fetchTokenRange(leftSession, leftScanRDD, tokenRange, metricsUpdater)
      val (rightMetadata, rightRowIterator) = fetchTokenRange(rightSession, rightScanRDD, tokenRange, metricsUpdater)

      val rowMerger = new MergeJoinIterator[Row, Row, Token](
        leftRowIterator,
        rightRowIterator,
        tokenExtractor,
        tokenExtractor
      )
      rowMerger.map { case (t: Token, lRows : Seq[Row], rRows: Seq[Row]) => (
        t,
        convertRowIterator(lRows.iterator, leftScanRDD.rowReader, leftMetadata).toList,
        convertRowIterator(rRows.iterator, rightScanRDD.rowReader, rightMetadata).toList)
      }
    }

    val countingIterator = new CountingIterator(mergingIterator)

    context.addTaskCompletionListener { (context) =>
      val duration = metricsUpdater.finish() / 1000000000d
      logDebug(
        f"""Fetched ${countingIterator.count} rows from
            |${leftScanRDD.keyspaceName} ${leftScanRDD.tableName} and ${rightScanRDD.tableName}
            |for partition ${partition.index} in $duration%.3f s.""".stripMargin)
      closeSessions(leftSession, rightSession)
      context
    }
    val iteratorWithoutToken = countingIterator.map(tuple => (tuple._2, tuple._3))
    iteratorWithoutToken
  }