in integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala [1082:1190]
def apply(
subsumer: ModularPlan,
subsumee: ModularPlan,
compensation: Option[ModularPlan],
generator: SubqueryNameGenerator): Seq[ModularPlan] = {
val aggInputEinR = subsumee.expressions
.collect { case agg: aggregate.AggregateExpression => AttributeSet(Seq(agg))
.subsetOf(subsumer.outputSet)
}.forall(identity)
val compensationSelectOnly = !compensation.map { _.collect { case n => n.getClass } }
.exists(_.contains(GroupBy))
(subsumer, subsumee, compensation, aggInputEinR, compensationSelectOnly) match {
case (
gb_2a @ GroupBy(_, _, _, _, _, _, _, _),
gb_2q @ GroupBy(_, _, _, _, _, _, _, _),
Some(sel_1c1 @ Select(_, _, _, _, _, _, _, _, _, _)),
true,
true)
if !gb_2q.flags.hasFlag(EXPAND) && !gb_2a.flags.hasFlag(EXPAND) =>
LOGGER.debug(s"Applying pattern: {GroupbyGroupbySelectOnlyChildDelta} for the plan: " +
s"{ ${ subsumee.toString().trim } }. " +
s"Current Subsumer: { ${ subsumer.toString().trim } }. " +
s"Compensation: { ${ sel_1c1.toString().trim } }")
val rejoinOutputList = sel_1c1.children.tail.flatMap(_.output)
val isGroupingEdR = gb_2q.predicateList.forall(expr =>
isDerivable(expr, gb_2a.predicateList ++ rejoinOutputList, gb_2q, gb_2a, compensation))
val needRegrouping = !gb_2a.predicateList
.forall(f => gb_2q.predicateList.contains(f) ||
isExpressionMatches(f, gb_2q.predicateList))
val canPullUp = sel_1c1.predicateList.forall(expr =>
isDerivable(expr, gb_2a.predicateList ++ rejoinOutputList, gb_2q, gb_2a, compensation))
val isAggEmR = gb_2q.outputList.collect {
case agg: aggregate.AggregateExpression =>
gb_2a.outputList.exists(_.semanticEquals(agg))
}.forall(identity)
if (isGroupingEdR && ((!needRegrouping && isAggEmR) || needRegrouping) && canPullUp) {
// pull up
val pullUpOutputList = gb_2a.outputList.map(_.toAttribute) ++ rejoinOutputList
val myOutputList = gb_2a.outputList.filter {
case alias: Alias =>
val aliasList = gb_2q.outputList.filter(_.isInstanceOf[Alias])
aliasList.exists(_.asInstanceOf[Alias].child.semanticEquals(alias.child)) ||
isExpressionMatches(alias.child, aliasList)
case attr: Attribute =>
gb_2q.outputList.exists(_.semanticEquals(attr))
}.map(_.toAttribute) ++ rejoinOutputList
// TODO: find out if we really need to check needRegrouping or just use myOutputList
val sel_2c1 = if (needRegrouping) {
sel_1c1
.copy(outputList = pullUpOutputList,
inputList = pullUpOutputList,
children = sel_1c1.children
.map { case _: modular.Select => gb_2a; case other => other })
} else {
sel_1c1
.copy(outputList = myOutputList,
inputList = pullUpOutputList,
children = sel_1c1.children
.map { case _: modular.Select => gb_2a; case other => other })
}
// sel_1c1.copy(outputList = pullUpOutputList, inputList = pullUpOutputList, children =
// sel_1c1.children.map { _ match { case s: modular.Select => gb_2a; case other =>
// other } })
if (rejoinOutputList.isEmpty) {
val aliasMap = AttributeMap(gb_2a.outputList.collect {
case a: Alias => (a.toAttribute, a)
})
val res =
tryMatch(gb_2a, gb_2q, aliasMap).flatMap {
case g: GroupBy =>
// Check any agg function exists on output list,
// in case of expressions like sum(a)+sum(b) ,
// output list directly replaces with alias with in place of function
// so we should remove the group by clause in those cases.
val aggFunExists = g.outputList.exists { f =>
f.find {
case _: AggregateExpression => true
case _ => false
}.isDefined
}
if (aggFunExists) {
Some(g.copy(child = sel_2c1))
} else {
// Remove group by clause.
Some(g.copy(child = sel_2c1, predicateList = Seq.empty))
}
case _ => None
}.map { wip =>
factorOutSubsumer(wip, gb_2a, sel_1c1.aliasMap)
}.map(Seq(_))
.getOrElse(Nil)
res
}
// TODO: implement regrouping with 1:N rejoin (rejoin tables being the "1" side)
// via catalog service
else if (!needRegrouping && isAggEmR) {
Seq(sel_2c1).map(wip => factorOutSubsumer(wip, gb_2a, sel_1c1.aliasMap))
} else Nil
} else Nil
case _ => Nil
}
}