in integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala [887:983]
def apply(
subsumer: ModularPlan,
subsumee: ModularPlan,
compensation: Option[ModularPlan],
generator: SubqueryNameGenerator): Seq[ModularPlan] = {
(subsumer, subsumee, compensation) match {
case (
gb_2a @ modular.GroupBy(_, _, _, _, _, _, _, _),
gb_2q @ modular.GroupBy(_, _, _, _, _, _, _, _),
None) =>
LOGGER.debug(s"Applying pattern: {GroupbyGroupbyNoChildDelta} for the plan: " +
s"{ ${ subsumee.toString().trim } }. " +
s"Current Subsumer: { ${ subsumer.toString().trim } }")
val isGroupingEmR = gb_2q.predicateList.forall(expr =>
gb_2a.predicateList.exists(_.semanticEquals(expr)) ||
isExpressionMatches(expr, gb_2a.predicateList))
val isGroupingRmE = gb_2a.predicateList.forall(expr =>
gb_2q.predicateList.exists(_.semanticEquals(expr)) ||
isExpressionMatches(expr, gb_2q.predicateList))
val isOutputEmR = compareOutputList(gb_2q.outputList, gb_2a.outputList)
if (isGroupingEmR && isGroupingRmE) {
if (isOutputEmR) {
// Mappings of output of two plans by checking semantic equals.
val mappings = gb_2a.outputList.zipWithIndex.map { case(exp, index) =>
(exp, gb_2q.outputList.find {
case a: Alias if exp.isInstanceOf[Alias] =>
a.child.semanticEquals(exp.children.head) ||
isExpressionMatches(a.child, exp.children.head) ||
a.sql.equalsIgnoreCase(exp.sql)
case a: Alias => a.child.semanticEquals(exp)
case other => exp match {
case alias: Alias =>
other.semanticEquals(alias.child)
case _ =>
other.semanticEquals(exp)
}
}.getOrElse(gb_2a.outputList(index)))
}
val oList = mappings.map{case (out1, out2) =>
if (out1.name != out2.name) out1 match {
case alias: Alias => Alias(alias.child, out2.name)(exprId = alias.exprId)
case _ => Alias(out1, out2.name)(exprId = out2.exprId)
} else out1
}
Seq(gb_2a.copy(outputList = oList))
} else {
Nil
}
} else {
val aliasMap = AttributeMap(gb_2a.outputList.collect { case a: Alias =>
(a.toAttribute, a)})
if (isGroupingEmR) {
val subsumerName = Seq(0 -> generator.newSubsumerName()).toMap
tryMatch(
gb_2a, gb_2q, aliasMap).flatMap {
case g: GroupBy =>
// Check any agg function exists on output list, in case of expressions like
// sum(a), then create new alias and copy to group by node
val aggFunExists = g.outputList.exists { f =>
f.find {
case _: AggregateExpression => true
case _ => false
}.isDefined
}
if (aggFunExists && !isGroupingRmE && isOutputEmR) {
val tChildren = new collection.mutable.ArrayBuffer[ModularPlan]()
val sel_1a = g.child.asInstanceOf[Select]
tChildren += gb_2a
val sel_1q_temp = sel_1a.copy(
predicateList = sel_1a.predicateList,
children = tChildren,
joinEdges = sel_1a.joinEdges,
aliasMap = subsumerName)
Some(g.copy(child = sel_1q_temp))
} else {
Some(g.copy(child = g.child.withNewChildren(
g.child.children.map {
case modular.Select(_, _, _, _, _, _, _, _, _, _) => gb_2a;
case other => other
})));
}
case _ => None
}.map { wip =>
factorOutSubsumer(wip, gb_2a, subsumerName)
}.map(Seq(_))
.getOrElse(Nil)
} else {
Nil
}
}
case _ => Nil
}
}