def apply()

in integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala [1082:1190]


  def apply(
      subsumer: ModularPlan,
      subsumee: ModularPlan,
      compensation: Option[ModularPlan],
      generator: SubqueryNameGenerator): Seq[ModularPlan] = {
    val aggInputEinR = subsumee.expressions
      .collect { case agg: aggregate.AggregateExpression => AttributeSet(Seq(agg))
        .subsetOf(subsumer.outputSet)
      }.forall(identity)
    val compensationSelectOnly = !compensation.map { _.collect { case n => n.getClass } }
      .exists(_.contains(GroupBy))

    (subsumer, subsumee, compensation, aggInputEinR, compensationSelectOnly) match {
      case (
        gb_2a @ GroupBy(_, _, _, _, _, _, _, _),
        gb_2q @ GroupBy(_, _, _, _, _, _, _, _),
        Some(sel_1c1 @ Select(_, _, _, _, _, _, _, _, _, _)),
        true,
        true)
        if !gb_2q.flags.hasFlag(EXPAND) && !gb_2a.flags.hasFlag(EXPAND) =>

        LOGGER.debug(s"Applying pattern: {GroupbyGroupbySelectOnlyChildDelta} for the plan: " +
                     s"{ ${ subsumee.toString().trim } }. " +
                     s"Current Subsumer: { ${ subsumer.toString().trim } }. " +
                     s"Compensation: { ${ sel_1c1.toString().trim } }")

        val rejoinOutputList = sel_1c1.children.tail.flatMap(_.output)
        val isGroupingEdR = gb_2q.predicateList.forall(expr =>
          isDerivable(expr, gb_2a.predicateList ++ rejoinOutputList, gb_2q, gb_2a, compensation))
        val needRegrouping = !gb_2a.predicateList
          .forall(f => gb_2q.predicateList.contains(f) ||
                       isExpressionMatches(f, gb_2q.predicateList))
        val canPullUp = sel_1c1.predicateList.forall(expr =>
          isDerivable(expr, gb_2a.predicateList ++ rejoinOutputList, gb_2q, gb_2a, compensation))
        val isAggEmR = gb_2q.outputList.collect {
          case agg: aggregate.AggregateExpression =>
            gb_2a.outputList.exists(_.semanticEquals(agg))
        }.forall(identity)

        if (isGroupingEdR && ((!needRegrouping && isAggEmR) || needRegrouping) && canPullUp) {
          // pull up
          val pullUpOutputList = gb_2a.outputList.map(_.toAttribute) ++ rejoinOutputList
          val myOutputList = gb_2a.outputList.filter {
            case alias: Alias =>
              val aliasList = gb_2q.outputList.filter(_.isInstanceOf[Alias])
              aliasList.exists(_.asInstanceOf[Alias].child.semanticEquals(alias.child)) ||
              isExpressionMatches(alias.child, aliasList)
            case attr: Attribute =>
              gb_2q.outputList.exists(_.semanticEquals(attr))
          }.map(_.toAttribute) ++ rejoinOutputList
          // TODO: find out if we really need to check needRegrouping or just use myOutputList
          val sel_2c1 = if (needRegrouping) {
            sel_1c1
              .copy(outputList = pullUpOutputList,
                inputList = pullUpOutputList,
                children = sel_1c1.children
                  .map { case _: modular.Select => gb_2a; case other => other })
          } else {
            sel_1c1
              .copy(outputList = myOutputList,
                inputList = pullUpOutputList,
                children = sel_1c1.children
                  .map { case _: modular.Select => gb_2a; case other => other })
          }
          // sel_1c1.copy(outputList = pullUpOutputList, inputList = pullUpOutputList, children =
          // sel_1c1.children.map { _ match { case s: modular.Select => gb_2a; case other =>
          // other } })

          if (rejoinOutputList.isEmpty) {
            val aliasMap = AttributeMap(gb_2a.outputList.collect {
              case a: Alias => (a.toAttribute, a)
            })
            val res =
              tryMatch(gb_2a, gb_2q, aliasMap).flatMap {
                case g: GroupBy =>

                  // Check any agg function exists on output list,
                  // in case of expressions like sum(a)+sum(b) ,
                  // output list directly replaces with alias with in place of function
                  // so we should remove the group by clause in those cases.
                  val aggFunExists = g.outputList.exists { f =>
                    f.find {
                      case _: AggregateExpression => true
                      case _ => false
                    }.isDefined
                  }
                  if (aggFunExists) {
                    Some(g.copy(child = sel_2c1))
                  } else {
                    // Remove group by clause.
                    Some(g.copy(child = sel_2c1, predicateList = Seq.empty))
                  }
                case _ => None
              }.map { wip =>
                factorOutSubsumer(wip, gb_2a, sel_1c1.aliasMap)
              }.map(Seq(_))
                .getOrElse(Nil)
            res
          }
          // TODO: implement regrouping with 1:N rejoin (rejoin tables being the "1" side)
          // via catalog service
          else if (!needRegrouping && isAggEmR) {
            Seq(sel_2c1).map(wip => factorOutSubsumer(wip, gb_2a, sel_1c1.aliasMap))
          } else Nil
        } else Nil

      case _ => Nil
    }
  }