override protected def doExecute()

in connector/src/main/scala/org/apache/spark/sql/cassandra/execution/CassandraDirectJoinExec.scala [144:221]


  override protected def doExecute(): RDD[InternalRow] = {
    /* UnsafeRows are pointers to spots in memory and when our
     * UnsafeProject is called on the next element it rewrites our first
     * pointer. Since we call our executions async we end up losing
     * the pointer to the join key unless we make a copy of the pointer
     *
     *
     * see @UnsafeRow.copy()
     * see @UnsafeProjection
     */
    val unsafeKeyRows: RDD[UnsafeRow] = keySource
        .execute()
        .mapPartitions(it => {
          val projection = UnsafeProjection.create(keySource.schema)
          it.map(row => projection.apply(row).copy())
        })

    implicit val rwf = new UnsafeRowWriterFactory(joinExpressions)
    implicit val rrf = new UnsafeRowReaderFactory(cassandraSchema)

    def innerJoin() = {
      val joinRDD = new CassandraJoinRDD[UnsafeRow, UnsafeRow](
        unsafeKeyRows,
        keyspace,
        table,
        connector,
        SomeColumns(cassandraScan.cqlQueryParts.selectedColumnRefs: _*),
        SomeColumns(joinColumns: _*),
        cqlQueryParts.whereClause,
        cqlQueryParts.limitClause,
        cqlQueryParts.clusteringOrder,
        readConf)

      val joinRow = new JoinedRow
      joinRDD.mapPartitions { it =>
        val resultProjection = createResultProjection
        it.map { case (unsafeKeyRow, cassandraRow) =>
          numOutputRows.add(1)
          joinRow.withLeft(unsafeKeyRow)
          joinRow.withRight(cassandraRow)
          resultProjection(joinRow)
        }.filter(boundCondition)
      }
    }

    def outerJoin() = {
      val joinRDD = new CassandraLeftJoinRDD[UnsafeRow, UnsafeRow](
        unsafeKeyRows,
        keyspace,
        table,
        connector,
        SomeColumns(cassandraScan.cqlQueryParts.selectedColumnRefs: _*),
        SomeColumns(joinColumns: _*),
        cqlQueryParts.whereClause,
        cqlQueryParts.limitClause,
        cqlQueryParts.clusteringOrder,
        readConf)

      val joinRow = new JoinedRow
      joinRDD.mapPartitions { it =>
        val resultProjection = createResultProjection
        val nullRow = new GenericInternalRow(cassandraPlan.output.length)
        it.map { case (unsafeKeyRow, cassandraRow) =>
          numOutputRows.add(1)
          joinRow.withLeft(unsafeKeyRow)
          joinRow.withRight(cassandraRow.getOrElse(nullRow))
          resultProjection(joinRow)
        }.filter(boundCondition)

      }
    }

    if (Seq(RightOuter, LeftOuter).contains(joinType)) {
      outerJoin()
    } else {
      innerJoin()
    }
  }