in integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala [660:879]
def apply(subsumer: ModularPlan,
subsumee: ModularPlan,
compensation: Option[ModularPlan],
generator: SubqueryNameGenerator): Seq[ModularPlan] = {
(subsumer, subsumee, compensation) match {
case (
sel_1a @ modular.Select(_, _, _, _, _, _, _, _, _, _),
sel_1q @ modular.Select(_, _, _, _, _, _, _, _, _, _), None
) if sel_1a.children.forall { _.isInstanceOf[modular.LeafNode] } &&
sel_1q.children.forall { _.isInstanceOf[modular.LeafNode] } =>
LOGGER.debug(s"Applying pattern: {SelectSelectNoChildDelta} for the plan: " +
s"{ ${ subsumee.toString().trim } }. " +
s"Current Subsumer: { ${ subsumer.toString().trim } }. ")
// assume children (including harmonized relation) of subsumer and subsumee
// are 1-1 correspondence.
// Change the following two conditions to more complicated ones if we want to
// consider things that combine extrajoin, rejoin, and harmonized relations
val isUniqueRmE = subsumer.children.filter { x => subsumee.children.count{
case relation: ModularRelation => relation.fineEquals(x)
case other => other == x
} != 1 }
val isUniqueEmR = subsumee.children.filter { x => subsumer.children.count{
case relation: ModularRelation => relation.fineEquals(x)
case other => other == x
} != 1 }
val extrajoin = sel_1a.children.filterNot { child => sel_1q.children.contains(child) }
val rejoin = sel_1q.children.filterNot { child => sel_1a.children.contains(child) }
val rejoinOutputList = rejoin.flatMap(_.output)
val isPredicateRmE = sel_1a.predicateList.forall(expr =>
sel_1q.predicateList.exists(_.semanticEquals(expr)) ||
isExpressionMatches(expr, sel_1q.predicateList))
val isPredicateEmdR = sel_1q.predicateList.forall(expr =>
isDerivable(expr, sel_1a.outputList ++ rejoinOutputList, sel_1q, sel_1a, None))
// Check if sel_1q.outputList is non empty and then check whether
// it can be derivable with sel_1a otherwise for empty cases it returns true.
val isOutputEdR = sel_1q.outputList.nonEmpty && sel_1q.outputList.forall(expr =>
isDerivable(expr, sel_1a.outputList ++ rejoinOutputList, sel_1q, sel_1a, None))
if (isUniqueRmE.isEmpty && isUniqueEmR.isEmpty && extrajoin.isEmpty && isPredicateRmE &&
isPredicateEmdR && isOutputEdR) {
val mappings = sel_1a.children.zipWithIndex.map {
case (child, fromIndex) if sel_1q.children.contains(child) =>
val toIndex = sel_1q.children.indexWhere{
case relation: ModularRelation => relation.fineEquals(child)
case other => other == child
}
(toIndex -> fromIndex)
}
val e2r = mappings.toMap
val r2e = e2r.map(_.swap)
val r2eJoinsMatch = sel_1a.joinEdges.forall { x =>
(r2e.get(x.left), r2e.get(x.right)) match {
case (Some(l), Some(r)) =>
val mappedEdge = JoinEdge(l, r, x.joinType)
val joinTypeEquivalent =
if (sel_1q.joinEdges.contains(mappedEdge)) {
true
} else {
x.joinType match {
case Inner | FullOuter =>
sel_1q.joinEdges.contains(JoinEdge(r, l, x.joinType))
case LeftOuter if isLeftJoinView(sel_1a) =>
sel_1q.joinEdges.contains(JoinEdge(l, r, Inner)) ||
sel_1q.joinEdges.contains(JoinEdge(r, l, Inner))
case _ => false
}
}
if (joinTypeEquivalent) {
val sel_1a_join = sel_1a.extractJoinConditions(
sel_1a.children(x.left),
sel_1a.children(x.right))
val sel_1q_join = sel_1q.extractJoinConditions(
sel_1q.children(mappedEdge.left),
sel_1q.children(mappedEdge.right))
sel_1a_join.forall(e => sel_1q_join.exists(e.semanticEquals) ||
isExpressionMatches(e, sel_1q_join)) &&
sel_1q_join.forall(e => sel_1a_join.exists(e.semanticEquals) ||
isExpressionMatches(e, sel_1a_join))
} else false
case _ => false
}
}
val isPredicateEmR = sel_1q.predicateList.forall(expr =>
sel_1a.predicateList.exists(_.semanticEquals(expr)) ||
isExpressionMatches(expr, sel_1a.predicateList))
val isOutputEmR = compareOutputList(sel_1q.outputList, sel_1a.outputList)
val isOutputRmE = compareOutputList(sel_1a.outputList, sel_1q.outputList)
val isLOEmLOR = !(isLeftJoinView(sel_1a) && sel_1q.joinEdges.head.joinType == Inner)
if (r2eJoinsMatch) {
if (isPredicateEmR && isOutputEmR && isOutputRmE && rejoin.isEmpty && isLOEmLOR) {
if (sel_1q.flagSpec.isEmpty) {
Seq(sel_1a)
} else {
Seq(sel_1a.copy(flags = sel_1q.flags, flagSpec = sel_1q.flagSpec))
}
} else {
// no compensation needed
val tChildren = new collection.mutable.ArrayBuffer[ModularPlan]()
val tAliasMap = new collection.mutable.HashMap[Int, String]()
val usel_1a = sel_1a.copy(outputList = sel_1a.outputList)
tChildren += usel_1a
tAliasMap += (tChildren.indexOf(usel_1a) -> generator.newSubsumerName())
sel_1q.children.zipWithIndex.foreach {
case (child, idx) =>
if (e2r.get(idx).isEmpty) {
tChildren += child
sel_1q.aliasMap.get(idx).map(x => tAliasMap += (tChildren.indexOf(child) -> x))
}
}
val tJoinEdges = sel_1q.joinEdges.collect {
case JoinEdge(le, re, joinType) =>
(e2r.get(le), e2r.get(re)) match {
case (Some(_), None) =>
JoinEdge(
0,
tChildren.indexOf(sel_1q.children(re)),
joinType)
case (None, None) =>
JoinEdge(
tChildren.indexOf(sel_1q.children(le)),
tChildren.indexOf(sel_1q.children(re)),
joinType)
case (None, Some(_)) =>
JoinEdge(
tChildren.indexOf(sel_1q.children(le)),
0,
joinType)
case _ =>
null.asInstanceOf[JoinEdge]
}
}
val tPredicateList = sel_1q.predicateList.filter { p =>
!sel_1a.predicateList.exists(_.semanticEquals(p))
} ++ (if (isLeftJoinView(sel_1a) &&
sel_1q.joinEdges.head.joinType == Inner) {
sel_1a.children(1)
.asInstanceOf[HarmonizedRelation].tag.map(IsNotNull(_)).toSeq
} else {
Seq.empty
})
val sel_1q_temp = sel_1q.copy(
predicateList = tPredicateList,
children = tChildren,
joinEdges = tJoinEdges.filter(_ != null),
aliasMap = tAliasMap.toMap)
val done = factorOutSubsumer(sel_1q_temp, usel_1a, sel_1q_temp.aliasMap)
Seq(done)
}
} else Nil
} else Nil
case (
sel_3a @ modular.Select(_, _, _, _, _, _, _, _, _, _),
sel_3q @ modular.Select(_, _, _, _, _, _, _, _, _, _), None)
if sel_3a.children.forall(_.isInstanceOf[GroupBy]) &&
sel_3q.children.forall(_.isInstanceOf[GroupBy]) =>
LOGGER.debug(s"Applying pattern: {SelectSelectNoChildDelta} for the plan: " +
s"{ ${ subsumee.toString().trim } }. " +
s"Current Subsumer: { ${ subsumer.toString().trim } }")
val isPredicateRmE = sel_3a.predicateList.isEmpty ||
sel_3a.predicateList.forall(expr =>
sel_3q.predicateList.exists(_.semanticEquals(expr)) ||
isExpressionMatches(expr, sel_3q.predicateList))
val isPredicateEmdR = sel_3q.predicateList.isEmpty ||
sel_3q.predicateList.forall(expr =>
sel_3a.predicateList.exists(_.semanticEquals(expr) ||
isExpressionMatches(expr,
sel_3a.predicateList)) ||
isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None))
val isOutputEdR = sel_3q.outputList.forall(expr =>
isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None))
val isSingleChild = sel_3a.children.length == 1 && sel_3q.children.length == 1
if (isPredicateRmE && isPredicateEmdR && isOutputEdR && isSingleChild) {
val isPredicateEmR = sel_3q.predicateList.isEmpty ||
sel_3q.predicateList.forall(expr =>
sel_3a.predicateList.exists(_.semanticEquals(expr)) ||
isExpressionMatches(expr, sel_3a.predicateList))
val isOutputRmE = sel_3a.outputList.forall(expr =>
isDerivable(expr, sel_3q.outputList, sel_3a, sel_3q, None))
val isOutputEmR = sel_3q.outputList.forall(expr =>
isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None))
if (isPredicateEmR && isOutputEmR && isOutputRmE) {
Seq(sel_3a)
} else if (isPredicateEmR && isOutputEmR) {
// no compensation needed
val sel_3q_exp = sel_3q.transformExpressions({
case a: Alias => sel_3a.outputList
.find { a1 =>
a1.isInstanceOf[Alias] &&
(a1.asInstanceOf[Alias].child.semanticEquals(a.child) ||
isExpressionMatches(a1.asInstanceOf[Alias].child, a.child))
}.map(_.toAttribute).get
})
val wip = sel_3q_exp.copy(
children = Seq(sel_3a),
aliasMap = Seq(0 -> generator.newSubsumerName()).toMap)
val done = factorOutSubsumer(wip, sel_3a, wip.aliasMap)
Seq(done)
} else {
Nil
}
} else Nil
case _ => Nil
}
}