in integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala [1445:1562]
def apply(
subsumer: ModularPlan,
subsumee: ModularPlan,
compensation: Option[ModularPlan],
generator: SubqueryNameGenerator): Seq[ModularPlan] = {
(subsumer, subsumee, compensation, subsumer.children, subsumee.children) match {
case (
sel_3a@modular.Select(
_, _, Nil, _, _,
Seq(_@modular.GroupBy(_, _, _, _, _, _, _, _)), _, _, _, _),
sel_3q_dup@modular.Select(
_, _, _, _, _,
Seq(_@modular.GroupBy(_, _, _, _, _, _, _, _)), _, _, _, _),
Some(gb_2c@modular.GroupBy(_, _, _, _, _, _, _, _)),
_ :: Nil,
_ :: Nil) =>
LOGGER.debug(s"Applying pattern: {SelectSelectGroupbyChildDelta} for the plan: " +
s"{ ${ subsumee.toString().trim } }. " +
s"Current Subsumer: { ${ subsumer.toString().trim } }. " +
s"Compensation: { ${ gb_2c.toString().trim } }")
val tbls_sel_3a = sel_3a.collect { case tbl: modular.LeafNode => tbl }
val tbls_sel_3q = sel_3q_dup.collect { case tbl: modular.LeafNode => tbl }
val distinctSelOList = getDistinctOutputList(sel_3q_dup.outputList)
val sel_3q = sel_3q_dup.copy(outputList = distinctSelOList)
val extrajoin = tbls_sel_3a.filterNot(tbls_sel_3q.contains)
val rejoin = tbls_sel_3q.filterNot(tbls_sel_3a.contains)
val rejoinOutputList = rejoin.flatMap(_.output)
val isPredicateRmE = sel_3a.predicateList.forall(expr =>
sel_3q.predicateList.exists(_.semanticEquals(expr)) ||
isExpressionMatches(expr, sel_3q.predicateList) ||
gb_2c.predicateList.exists(_.semanticEquals(expr)) ||
isExpressionMatches(expr, gb_2c.predicateList))
val isPredicateEmdR = sel_3q.predicateList
.forall(expr =>
sel_3a.predicateList.exists(_.semanticEquals(expr)) ||
isExpressionMatches(expr, sel_3a.predicateList) ||
isDerivable(
expr,
sel_3a.outputList ++ rejoinOutputList,
sel_3q,
sel_3a,
compensation))
val isOutputEdR = sel_3q.outputList
.forall(expr =>
isDerivable(
expr,
sel_3a.outputList ++ rejoinOutputList,
sel_3q,
sel_3a,
compensation))
val canSELPullUp = gb_2c.child.isInstanceOf[Select] &&
gb_2c.child.asInstanceOf[Select].predicateList
.forall(expr =>
isDerivable(
expr,
sel_3a.outputList ++ rejoinOutputList,
sel_3q,
sel_3a,
compensation))
val canGBPullUp = gb_2c.predicateList
.forall(expr =>
isDerivable(
expr,
sel_3a.outputList ++ rejoinOutputList,
sel_3q,
sel_3a,
compensation))
if (extrajoin.isEmpty && isPredicateRmE &&
isPredicateEmdR &&
isOutputEdR &&
canSELPullUp &&
canGBPullUp) {
gb_2c.child match {
case s: Select =>
val sel_3c1 = s.withNewChildren(
s.children.map {
case _: GroupBy => sel_3a.setSkip()
case other => other })
val gb_3c2 = gb_2c.copy(child = sel_3c1)
val aliasMap_exp = AttributeMap(
gb_2c.outputList.collect {
case a: Alias => (a.toAttribute, AliasWrapper(a)) })
val sel_3q_exp = sel_3q_dup.transformExpressions({
case attr: Attribute if aliasMap_exp.contains(attr) => aliasMap_exp(attr)
}).transformExpressions {
case AliasWrapper(alias: Alias) => alias
}
// Mappings of output of two plans by checking semantic equals.
val mappings = sel_3q_exp.outputList.zipWithIndex.map { case(exp, index) =>
(exp, gb_2c.outputList.find {
case a: Alias if exp.isInstanceOf[Alias] =>
a.child.semanticEquals(exp.children.head) ||
isExpressionMatches(a.child, exp.children.head)
case a: Alias => a.child.semanticEquals(exp)
case other => other.semanticEquals(exp)
}.getOrElse(gb_2c.outputList(index)))
}
val oList = CarbonToSparkAdapter.createAliases(mappings)
val wip = sel_3q_exp.copy(outputList = oList, children = Seq(gb_3c2))
val sel_3c3 = Some(factorOutSubsumer(wip, sel_3a, s.aliasMap))
sel_3c3.map(Seq(_)).getOrElse(Nil)
case _ => Nil
}
} else {
Nil
}
case _ => Nil
}
}