in connector/src/main/scala/org/apache/spark/sql/cassandra/execution/CassandraDirectJoinExec.scala [144:221]
override protected def doExecute(): RDD[InternalRow] = {
/* UnsafeRows are pointers to spots in memory and when our
* UnsafeProject is called on the next element it rewrites our first
* pointer. Since we call our executions async we end up losing
* the pointer to the join key unless we make a copy of the pointer
*
*
* see @UnsafeRow.copy()
* see @UnsafeProjection
*/
val unsafeKeyRows: RDD[UnsafeRow] = keySource
.execute()
.mapPartitions(it => {
val projection = UnsafeProjection.create(keySource.schema)
it.map(row => projection.apply(row).copy())
})
implicit val rwf = new UnsafeRowWriterFactory(joinExpressions)
implicit val rrf = new UnsafeRowReaderFactory(cassandraSchema)
def innerJoin() = {
val joinRDD = new CassandraJoinRDD[UnsafeRow, UnsafeRow](
unsafeKeyRows,
keyspace,
table,
connector,
SomeColumns(cassandraScan.cqlQueryParts.selectedColumnRefs: _*),
SomeColumns(joinColumns: _*),
cqlQueryParts.whereClause,
cqlQueryParts.limitClause,
cqlQueryParts.clusteringOrder,
readConf)
val joinRow = new JoinedRow
joinRDD.mapPartitions { it =>
val resultProjection = createResultProjection
it.map { case (unsafeKeyRow, cassandraRow) =>
numOutputRows.add(1)
joinRow.withLeft(unsafeKeyRow)
joinRow.withRight(cassandraRow)
resultProjection(joinRow)
}.filter(boundCondition)
}
}
def outerJoin() = {
val joinRDD = new CassandraLeftJoinRDD[UnsafeRow, UnsafeRow](
unsafeKeyRows,
keyspace,
table,
connector,
SomeColumns(cassandraScan.cqlQueryParts.selectedColumnRefs: _*),
SomeColumns(joinColumns: _*),
cqlQueryParts.whereClause,
cqlQueryParts.limitClause,
cqlQueryParts.clusteringOrder,
readConf)
val joinRow = new JoinedRow
joinRDD.mapPartitions { it =>
val resultProjection = createResultProjection
val nullRow = new GenericInternalRow(cassandraPlan.output.length)
it.map { case (unsafeKeyRow, cassandraRow) =>
numOutputRows.add(1)
joinRow.withLeft(unsafeKeyRow)
joinRow.withRight(cassandraRow.getOrElse(nullRow))
resultProjection(joinRow)
}.filter(boundCondition)
}
}
if (Seq(RightOuter, LeftOuter).contains(joinType)) {
outerJoin()
} else {
innerJoin()
}
}