def apply()

in integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala [660:879]


  def apply(subsumer: ModularPlan,
      subsumee: ModularPlan,
      compensation: Option[ModularPlan],
      generator: SubqueryNameGenerator): Seq[ModularPlan] = {

    (subsumer, subsumee, compensation) match {
      case (
        sel_1a @ modular.Select(_, _, _, _, _, _, _, _, _, _),
        sel_1q @ modular.Select(_, _, _, _, _, _, _, _, _, _), None
        ) if sel_1a.children.forall { _.isInstanceOf[modular.LeafNode] } &&
             sel_1q.children.forall { _.isInstanceOf[modular.LeafNode] } =>

        LOGGER.debug(s"Applying pattern: {SelectSelectNoChildDelta} for the plan: " +
                     s"{ ${ subsumee.toString().trim } }. " +
                     s"Current Subsumer: { ${ subsumer.toString().trim } }. ")

        // assume children (including harmonized relation) of subsumer and subsumee
        // are 1-1 correspondence.
        // Change the following two conditions to more complicated ones if we want to
        // consider things that combine extrajoin, rejoin, and harmonized relations
        val isUniqueRmE = subsumer.children.filter { x => subsumee.children.count{
          case relation: ModularRelation => relation.fineEquals(x)
          case other => other == x
        } != 1 }
        val isUniqueEmR = subsumee.children.filter { x => subsumer.children.count{
          case relation: ModularRelation => relation.fineEquals(x)
          case other => other == x
        } != 1 }

        val extrajoin = sel_1a.children.filterNot { child => sel_1q.children.contains(child) }
        val rejoin = sel_1q.children.filterNot { child => sel_1a.children.contains(child) }
        val rejoinOutputList = rejoin.flatMap(_.output)

        val isPredicateRmE = sel_1a.predicateList.forall(expr =>
          sel_1q.predicateList.exists(_.semanticEquals(expr)) ||
          isExpressionMatches(expr, sel_1q.predicateList))
        val isPredicateEmdR = sel_1q.predicateList.forall(expr =>
          isDerivable(expr, sel_1a.outputList ++ rejoinOutputList, sel_1q, sel_1a, None))
        // Check if sel_1q.outputList is non empty and then check whether
        // it can be derivable with sel_1a otherwise for empty cases it returns true.
        val isOutputEdR = sel_1q.outputList.nonEmpty && sel_1q.outputList.forall(expr =>
          isDerivable(expr, sel_1a.outputList ++ rejoinOutputList, sel_1q, sel_1a, None))

        if (isUniqueRmE.isEmpty && isUniqueEmR.isEmpty && extrajoin.isEmpty && isPredicateRmE &&
            isPredicateEmdR && isOutputEdR) {
          val mappings = sel_1a.children.zipWithIndex.map {
            case (child, fromIndex) if sel_1q.children.contains(child) =>
              val toIndex = sel_1q.children.indexWhere{
                case relation: ModularRelation => relation.fineEquals(child)
                case other => other == child
              }
              (toIndex -> fromIndex)

          }
          val e2r = mappings.toMap
          val r2e = e2r.map(_.swap)
          val r2eJoinsMatch = sel_1a.joinEdges.forall { x =>
            (r2e.get(x.left), r2e.get(x.right)) match {
              case (Some(l), Some(r)) =>
                val mappedEdge = JoinEdge(l, r, x.joinType)
                val joinTypeEquivalent =
                  if (sel_1q.joinEdges.contains(mappedEdge)) {
                    true
                  } else {
                    x.joinType match {
                      case Inner | FullOuter =>
                        sel_1q.joinEdges.contains(JoinEdge(r, l, x.joinType))
                      case LeftOuter if isLeftJoinView(sel_1a) =>
                        sel_1q.joinEdges.contains(JoinEdge(l, r, Inner)) ||
                        sel_1q.joinEdges.contains(JoinEdge(r, l, Inner))
                      case _ => false
                    }
                  }
                if (joinTypeEquivalent) {
                  val sel_1a_join = sel_1a.extractJoinConditions(
                    sel_1a.children(x.left),
                    sel_1a.children(x.right))
                  val sel_1q_join = sel_1q.extractJoinConditions(
                    sel_1q.children(mappedEdge.left),
                    sel_1q.children(mappedEdge.right))
                  sel_1a_join.forall(e => sel_1q_join.exists(e.semanticEquals) ||
                                          isExpressionMatches(e, sel_1q_join)) &&
                  sel_1q_join.forall(e => sel_1a_join.exists(e.semanticEquals) ||
                                          isExpressionMatches(e, sel_1a_join))
                } else false
              case _ => false
            }
          }

          val isPredicateEmR = sel_1q.predicateList.forall(expr =>
            sel_1a.predicateList.exists(_.semanticEquals(expr)) ||
            isExpressionMatches(expr, sel_1a.predicateList))
          val isOutputEmR = compareOutputList(sel_1q.outputList, sel_1a.outputList)
          val isOutputRmE = compareOutputList(sel_1a.outputList, sel_1q.outputList)
          val isLOEmLOR = !(isLeftJoinView(sel_1a) && sel_1q.joinEdges.head.joinType == Inner)

          if (r2eJoinsMatch) {
            if (isPredicateEmR && isOutputEmR && isOutputRmE && rejoin.isEmpty && isLOEmLOR) {
              if (sel_1q.flagSpec.isEmpty) {
                Seq(sel_1a)
              } else {
                Seq(sel_1a.copy(flags = sel_1q.flags, flagSpec = sel_1q.flagSpec))
              }
            } else {
              // no compensation needed
              val tChildren = new collection.mutable.ArrayBuffer[ModularPlan]()
              val tAliasMap = new collection.mutable.HashMap[Int, String]()

              val usel_1a = sel_1a.copy(outputList = sel_1a.outputList)
              tChildren += usel_1a
              tAliasMap += (tChildren.indexOf(usel_1a) -> generator.newSubsumerName())

              sel_1q.children.zipWithIndex.foreach {
                case (child, idx) =>
                  if (e2r.get(idx).isEmpty) {
                    tChildren += child
                    sel_1q.aliasMap.get(idx).map(x => tAliasMap += (tChildren.indexOf(child) -> x))
                  }
              }

              val tJoinEdges = sel_1q.joinEdges.collect {
                case JoinEdge(le, re, joinType) =>
                  (e2r.get(le), e2r.get(re)) match {
                    case (Some(_), None) =>
                      JoinEdge(
                        0,
                        tChildren.indexOf(sel_1q.children(re)),
                        joinType)
                    case (None, None) =>
                      JoinEdge(
                        tChildren.indexOf(sel_1q.children(le)),
                        tChildren.indexOf(sel_1q.children(re)),
                        joinType)
                    case (None, Some(_)) =>
                      JoinEdge(
                        tChildren.indexOf(sel_1q.children(le)),
                        0,
                        joinType)
                    case _ =>
                      null.asInstanceOf[JoinEdge]
                  }
              }
              val tPredicateList = sel_1q.predicateList.filter { p =>
                !sel_1a.predicateList.exists(_.semanticEquals(p))
              } ++ (if (isLeftJoinView(sel_1a) &&
                        sel_1q.joinEdges.head.joinType == Inner) {
                sel_1a.children(1)
                  .asInstanceOf[HarmonizedRelation].tag.map(IsNotNull(_)).toSeq
              } else {
                Seq.empty
              })
              val sel_1q_temp = sel_1q.copy(
                predicateList = tPredicateList,
                children = tChildren,
                joinEdges = tJoinEdges.filter(_ != null),
                aliasMap = tAliasMap.toMap)

              val done = factorOutSubsumer(sel_1q_temp, usel_1a, sel_1q_temp.aliasMap)
              Seq(done)
            }
          } else Nil
        } else Nil

      case (
        sel_3a @ modular.Select(_, _, _, _, _, _, _, _, _, _),
        sel_3q @ modular.Select(_, _, _, _, _, _, _, _, _, _), None)
        if sel_3a.children.forall(_.isInstanceOf[GroupBy]) &&
           sel_3q.children.forall(_.isInstanceOf[GroupBy]) =>
        LOGGER.debug(s"Applying pattern: {SelectSelectNoChildDelta} for the plan: " +
                     s"{ ${ subsumee.toString().trim } }. " +
                     s"Current Subsumer: { ${ subsumer.toString().trim } }")
        val isPredicateRmE = sel_3a.predicateList.isEmpty ||
                             sel_3a.predicateList.forall(expr =>
                               sel_3q.predicateList.exists(_.semanticEquals(expr)) ||
                               isExpressionMatches(expr, sel_3q.predicateList))
        val isPredicateEmdR = sel_3q.predicateList.isEmpty ||
                              sel_3q.predicateList.forall(expr =>
                                sel_3a.predicateList.exists(_.semanticEquals(expr) ||
                                                            isExpressionMatches(expr,
                                                              sel_3a.predicateList)) ||
                                isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None))
        val isOutputEdR = sel_3q.outputList.forall(expr =>
          isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None))
        val isSingleChild = sel_3a.children.length == 1 && sel_3q.children.length == 1

        if (isPredicateRmE && isPredicateEmdR && isOutputEdR && isSingleChild) {
          val isPredicateEmR = sel_3q.predicateList.isEmpty ||
                               sel_3q.predicateList.forall(expr =>
                                 sel_3a.predicateList.exists(_.semanticEquals(expr)) ||
                                 isExpressionMatches(expr, sel_3a.predicateList))
          val isOutputRmE = sel_3a.outputList.forall(expr =>
            isDerivable(expr, sel_3q.outputList, sel_3a, sel_3q, None))
          val isOutputEmR = sel_3q.outputList.forall(expr =>
            isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None))

          if (isPredicateEmR && isOutputEmR && isOutputRmE) {
            Seq(sel_3a)
          } else if (isPredicateEmR && isOutputEmR) {
            // no compensation needed
            val sel_3q_exp = sel_3q.transformExpressions({
              case a: Alias => sel_3a.outputList
                .find { a1 =>
                  a1.isInstanceOf[Alias] &&
                    (a1.asInstanceOf[Alias].child.semanticEquals(a.child) ||
                      isExpressionMatches(a1.asInstanceOf[Alias].child, a.child))
                }.map(_.toAttribute).get
            })
            val wip = sel_3q_exp.copy(
              children = Seq(sel_3a),
              aliasMap = Seq(0 -> generator.newSubsumerName()).toMap)
            val done = factorOutSubsumer(wip, sel_3a, wip.aliasMap)
            Seq(done)
          } else {
            Nil
          }
        } else Nil

      case _ => Nil
    }
  }