in connector/src/main/scala/org/apache/spark/sql/cassandra/execution/CassandraDirectJoinStrategy.scala [45:91]
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, _, left, right, _)
if hasValidDirectJoin(joinType, leftKeys, rightKeys, condition, left, right) =>
val (otherBranch, joinTargetBranch, buildType) = {
if (leftValid(joinType, leftKeys, rightKeys, condition, left, right)) {
(right, left, BuildLeft)
} else {
(left, right, BuildRight)
}
}
/* We want to take advantage of all of our pushed filter code which happens in
full table scans. Unfortunately the pushdown code itself is private within the
DataSourceV2Strategy class. To work around this we will invoke DataSourceV2Strategy on
our target branch. This will let us know all of the pushable filters that we can
use in the direct join.
*/
val dataSourceOptimizedPlan = new DataSourceV2Strategy(spark)(joinTargetBranch).head
val cassandraScanExec = getScanExec(dataSourceOptimizedPlan).get
joinTargetBranch match {
case PhysicalOperation(attributes, _, DataSourceV2ScanRelation(DataSourceV2Relation(_: CassandraTable, _, _, _, _), _, _, _, _)) =>
val directJoin =
CassandraDirectJoinExec(
leftKeys,
rightKeys,
joinType,
buildType,
condition,
planLater(otherBranch),
aliasMap(attributes),
cassandraScanExec
)
val newPlan = reorderPlan(dataSourceOptimizedPlan, directJoin, plan.output) :: Nil
val newOutput = (newPlan.head.outputSet, newPlan.head.output.map(_.name))
val oldOutput = (plan.outputSet, plan.output.map(_.name))
val noMissingOutput = oldOutput._1.subsetOf(newPlan.head.outputSet)
require(noMissingOutput, s"Cassandra DirectJoin Optimization produced invalid output. Original plan output: " +
s"${oldOutput} was not part of ${newOutput} \nOld Plan\n${plan}\nNew Plan\n${newPlan}")
newPlan
case _ => Nil //Unable to do optimization on target branch
}
case _ => Nil //No valid Target for Join Optimization
}