def reorderPlan()

in connector/src/main/scala/org/apache/spark/sql/cassandra/execution/CassandraDirectJoinStrategy.scala [253:296]


  def reorderPlan(
      plan: SparkPlan,
      directJoin: CassandraDirectJoinExec,
      originalOutput: Seq[Attribute]): SparkPlan = {
    val reordered = plan match {
      //This may be the only node in the Plan
      case BatchScanExec(_, _: CassandraScan, _, _, _, _) => directJoin
      // Plan has children
      case normalPlan => normalPlan.transform {
        case penultimate if hasCassandraChild(penultimate) =>
          penultimate.withNewChildren(Seq(directJoin))
      }
    }

    /*
    The output of our new join node may be missing some aliases which were
    previously applied to columns coming out of cassandra. Take the directJoin output and
    make sure all aliases are correctly applied to the new attributes. Nullability is a
    concern here as columns which may have been non-nullable previously, become nullable in
    a left/right join
    */
    reordered.transform {
      case ProjectExec(projectList, child) =>
        val attrMap = directJoin.output.map {
          case attr => attr.exprId -> attr
        }.toMap

        val aliases = projectList.collect {
          case a @ Alias(child, _) =>
            val newAliasChild = child.transform {
              case attr: Attribute => attrMap.getOrElse(attr.exprId, attr)
            }
            (a.exprId, a.withNewChildren(newAliasChild :: Nil).asInstanceOf[Alias])
        }.toMap

        // The original output of Join
        val reorderedOutput = originalOutput.map {
          case attr if aliases.contains(attr.exprId) => aliases(attr.exprId)
          case other => other
        }

        ProjectExec(reorderedOutput, child)
    }
  }