in connector/src/main/scala/org/apache/spark/sql/cassandra/execution/CassandraDirectJoinStrategy.scala [253:296]
def reorderPlan(
plan: SparkPlan,
directJoin: CassandraDirectJoinExec,
originalOutput: Seq[Attribute]): SparkPlan = {
val reordered = plan match {
//This may be the only node in the Plan
case BatchScanExec(_, _: CassandraScan, _, _, _, _) => directJoin
// Plan has children
case normalPlan => normalPlan.transform {
case penultimate if hasCassandraChild(penultimate) =>
penultimate.withNewChildren(Seq(directJoin))
}
}
/*
The output of our new join node may be missing some aliases which were
previously applied to columns coming out of cassandra. Take the directJoin output and
make sure all aliases are correctly applied to the new attributes. Nullability is a
concern here as columns which may have been non-nullable previously, become nullable in
a left/right join
*/
reordered.transform {
case ProjectExec(projectList, child) =>
val attrMap = directJoin.output.map {
case attr => attr.exprId -> attr
}.toMap
val aliases = projectList.collect {
case a @ Alias(child, _) =>
val newAliasChild = child.transform {
case attr: Attribute => attrMap.getOrElse(attr.exprId, attr)
}
(a.exprId, a.withNewChildren(newAliasChild :: Nil).asInstanceOf[Alias])
}.toMap
// The original output of Join
val reorderedOutput = originalOutput.map {
case attr if aliases.contains(attr.exprId) => aliases(attr.exprId)
case other => other
}
ProjectExec(reorderedOutput, child)
}
}