override def apply()

in connector/src/main/scala/org/apache/spark/sql/cassandra/execution/CassandraDirectJoinStrategy.scala [45:91]


  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, _, left, right, _)
      if hasValidDirectJoin(joinType, leftKeys, rightKeys, condition, left, right) =>

      val (otherBranch, joinTargetBranch, buildType) = {
        if (leftValid(joinType, leftKeys, rightKeys, condition, left, right)) {
          (right, left, BuildLeft)
        } else {
          (left, right, BuildRight)
        }
      }

      /* We want to take advantage of all of our pushed filter code which happens in
         full table scans. Unfortunately the pushdown code itself is private within the
         DataSourceV2Strategy class. To work around this we will invoke DataSourceV2Strategy on
         our target branch. This will let us know all of the pushable filters that we can
         use in the direct join.
      */
      val dataSourceOptimizedPlan = new DataSourceV2Strategy(spark)(joinTargetBranch).head
      val cassandraScanExec = getScanExec(dataSourceOptimizedPlan).get

      joinTargetBranch match {
        case PhysicalOperation(attributes, _, DataSourceV2ScanRelation(DataSourceV2Relation(_: CassandraTable, _, _, _, _), _, _, _, _)) =>
          val directJoin =
            CassandraDirectJoinExec(
              leftKeys,
              rightKeys,
              joinType,
              buildType,
              condition,
              planLater(otherBranch),
              aliasMap(attributes),
              cassandraScanExec
            )

          val newPlan = reorderPlan(dataSourceOptimizedPlan, directJoin, plan.output) :: Nil
          val newOutput = (newPlan.head.outputSet, newPlan.head.output.map(_.name))
          val oldOutput = (plan.outputSet, plan.output.map(_.name))
          val noMissingOutput = oldOutput._1.subsetOf(newPlan.head.outputSet)
          require(noMissingOutput, s"Cassandra DirectJoin Optimization produced invalid output. Original plan output: " +
            s"${oldOutput} was not part of ${newOutput} \nOld Plan\n${plan}\nNew Plan\n${newPlan}")

          newPlan
        case _ => Nil //Unable to do optimization on target branch
      }
    case _ => Nil //No valid Target for Join Optimization
  }