def apply()

in integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala [1445:1562]


  def apply(
      subsumer: ModularPlan,
      subsumee: ModularPlan,
      compensation: Option[ModularPlan],
      generator: SubqueryNameGenerator): Seq[ModularPlan] = {
    (subsumer, subsumee, compensation, subsumer.children, subsumee.children) match {
      case (
        sel_3a@modular.Select(
        _, _, Nil, _, _,
        Seq(_@modular.GroupBy(_, _, _, _, _, _, _, _)), _, _, _, _),
        sel_3q_dup@modular.Select(
        _, _, _, _, _,
        Seq(_@modular.GroupBy(_, _, _, _, _, _, _, _)), _, _, _, _),
        Some(gb_2c@modular.GroupBy(_, _, _, _, _, _, _, _)),
        _ :: Nil,
        _ :: Nil) =>
        LOGGER.debug(s"Applying pattern: {SelectSelectGroupbyChildDelta} for the plan: " +
                     s"{ ${ subsumee.toString().trim } }. " +
                     s"Current Subsumer: { ${ subsumer.toString().trim } }. " +
                     s"Compensation: { ${ gb_2c.toString().trim } }")
        val tbls_sel_3a = sel_3a.collect { case tbl: modular.LeafNode => tbl }
        val tbls_sel_3q = sel_3q_dup.collect { case tbl: modular.LeafNode => tbl }
        val distinctSelOList = getDistinctOutputList(sel_3q_dup.outputList)
        val sel_3q = sel_3q_dup.copy(outputList = distinctSelOList)

        val extrajoin = tbls_sel_3a.filterNot(tbls_sel_3q.contains)
        val rejoin = tbls_sel_3q.filterNot(tbls_sel_3a.contains)
        val rejoinOutputList = rejoin.flatMap(_.output)

        val isPredicateRmE = sel_3a.predicateList.forall(expr =>
          sel_3q.predicateList.exists(_.semanticEquals(expr)) ||
          isExpressionMatches(expr, sel_3q.predicateList) ||
          gb_2c.predicateList.exists(_.semanticEquals(expr)) ||
          isExpressionMatches(expr, gb_2c.predicateList))
        val isPredicateEmdR = sel_3q.predicateList
          .forall(expr =>
            sel_3a.predicateList.exists(_.semanticEquals(expr)) ||
            isExpressionMatches(expr, sel_3a.predicateList) ||
            isDerivable(
              expr,
              sel_3a.outputList ++ rejoinOutputList,
              sel_3q,
              sel_3a,
              compensation))
        val isOutputEdR = sel_3q.outputList
          .forall(expr =>
            isDerivable(
              expr,
              sel_3a.outputList ++ rejoinOutputList,
              sel_3q,
              sel_3a,
              compensation))

        val canSELPullUp = gb_2c.child.isInstanceOf[Select] &&
                           gb_2c.child.asInstanceOf[Select].predicateList
                             .forall(expr =>
                               isDerivable(
                                 expr,
                                 sel_3a.outputList ++ rejoinOutputList,
                                 sel_3q,
                                 sel_3a,
                                 compensation))
        val canGBPullUp = gb_2c.predicateList
          .forall(expr =>
            isDerivable(
              expr,
              sel_3a.outputList ++ rejoinOutputList,
              sel_3q,
              sel_3a,
              compensation))

        if (extrajoin.isEmpty && isPredicateRmE &&
            isPredicateEmdR &&
            isOutputEdR &&
            canSELPullUp &&
            canGBPullUp) {
          gb_2c.child match {
            case s: Select =>
              val sel_3c1 = s.withNewChildren(
                s.children.map {
                  case _: GroupBy => sel_3a.setSkip()
                  case other => other })
              val gb_3c2 = gb_2c.copy(child = sel_3c1)

              val aliasMap_exp = AttributeMap(
                gb_2c.outputList.collect {
                  case a: Alias => (a.toAttribute, AliasWrapper(a)) })
              val sel_3q_exp = sel_3q_dup.transformExpressions({
                case attr: Attribute if aliasMap_exp.contains(attr) => aliasMap_exp(attr)
              }).transformExpressions {
                case AliasWrapper(alias: Alias) => alias
              }
              // Mappings of output of two plans by checking semantic equals.
              val mappings = sel_3q_exp.outputList.zipWithIndex.map { case(exp, index) =>
                (exp, gb_2c.outputList.find {
                  case a: Alias if exp.isInstanceOf[Alias] =>
                    a.child.semanticEquals(exp.children.head) ||
                      isExpressionMatches(a.child, exp.children.head)
                  case a: Alias => a.child.semanticEquals(exp)
                  case other => other.semanticEquals(exp)
                }.getOrElse(gb_2c.outputList(index)))
              }

              val oList = CarbonToSparkAdapter.createAliases(mappings)

              val wip = sel_3q_exp.copy(outputList = oList, children = Seq(gb_3c2))
              val sel_3c3 = Some(factorOutSubsumer(wip, sel_3a, s.aliasMap))
              sel_3c3.map(Seq(_)).getOrElse(Nil)

            case _ => Nil
          }
        } else {
          Nil
        }

      case _ => Nil
    }
  }