in integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala [305:601]
private def doMatch(
operator_a: modular.Matchable,
operator_q: modular.Matchable,
alias_m: AttributeMap[Alias]): Option[modular.Matchable] = {
var matchable = true
val matched = operator_q.transformExpressions {
case cnt_q: AggregateExpression if cnt_q.aggregateFunction.isInstanceOf[Count] =>
val exprs_q = cnt_q.aggregateFunction.children
operator_a.outputList.find {
case alias: Alias if alias_m.contains(alias.toAttribute) &&
alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] &&
alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
.aggregateFunction.isInstanceOf[Count] =>
// case for group by
val cnt_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
val exprs_a = cnt_a.aggregateFunction.asInstanceOf[Count].children
if (cnt_a.isDistinct != cnt_q.isDistinct || exprs_q.length != exprs_a.length) {
false
} else {
exprs_a.sortBy(_.hashCode()).zip(exprs_q.sortBy(_.hashCode()))
.forall(p => p._1.semanticEquals(p._2))
}
case attr: Attribute if alias_m.contains(attr) &&
alias_m(attr).child.isInstanceOf[AggregateExpression] &&
alias_m(attr).child.asInstanceOf[AggregateExpression]
.aggregateFunction.isInstanceOf[Count] =>
val cnt_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
val exprs_a = cnt_a.aggregateFunction.asInstanceOf[Count].children
if (cnt_a.isDistinct != cnt_q.isDistinct || exprs_q.length != exprs_a.length) {
false
} else {
exprs_a.sortBy(_.hashCode()).zip(exprs_q.sortBy(_.hashCode()))
.forall(p => p._1.semanticEquals(p._2))
}
case _ => false
}.map { cnt => cnt_q.copy(
Sum(cnt.toAttribute),
isDistinct = false)
}.getOrElse { matchable = false; cnt_q }
case sum_q: AggregateExpression if sum_q.aggregateFunction.isInstanceOf[Sum] =>
val expr_q = sum_q.aggregateFunction.children.head
operator_a.outputList.find {
case alias: Alias if alias_m.contains(alias.toAttribute) &&
alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] &&
alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
.aggregateFunction.isInstanceOf[Sum] =>
val sum_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
val expr_a = sum_a.aggregateFunction.asInstanceOf[Sum].child
if (sum_a.isDistinct != sum_q.isDistinct) {
false
} else {
expr_a.semanticEquals(expr_q)
}
case attr: Attribute if alias_m.contains(attr) &&
alias_m(attr).child.isInstanceOf[AggregateExpression] &&
alias_m(attr).child.asInstanceOf[AggregateExpression]
.aggregateFunction.isInstanceOf[Sum] =>
val sum_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
val expr_a = sum_a.aggregateFunction.asInstanceOf[Sum].child
if (sum_a.isDistinct != sum_q.isDistinct) {
false
} else {
expr_a.semanticEquals(expr_q)
}
case _ => false
}.map { sum => sum_q.copy(
Sum(sum.toAttribute),
isDistinct = false)
}.getOrElse { matchable = false; sum_q }
case max_q: AggregateExpression if max_q.aggregateFunction.isInstanceOf[Max] =>
val expr_q = max_q.aggregateFunction.children.head
operator_a.outputList.find {
case alias: Alias if alias_m.contains(alias.toAttribute) &&
alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] &&
alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
.aggregateFunction.isInstanceOf[Max] =>
val max_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
val expr_a = max_a.aggregateFunction.asInstanceOf[Max].child
if (max_a.isDistinct != max_q.isDistinct) {
false
} else {
expr_a.semanticEquals(expr_q)
}
case attr: Attribute if alias_m.contains(attr) &&
alias_m(attr).child.isInstanceOf[AggregateExpression] &&
alias_m(attr).child.asInstanceOf[AggregateExpression]
.aggregateFunction.isInstanceOf[Max] =>
val max_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
val expr_a = max_a.aggregateFunction.asInstanceOf[Max].child
if (max_a.isDistinct != max_q.isDistinct) {
false
} else {
expr_a.semanticEquals(expr_q)
}
case _ => false
}.map { max => max_q.copy(
Max(max.toAttribute),
isDistinct = false)
}.getOrElse { matchable = false; max_q }
case min_q: AggregateExpression if min_q.aggregateFunction.isInstanceOf[Min] =>
val expr_q = min_q.aggregateFunction.children.head
operator_a.outputList.find {
case alias: Alias if alias_m.contains(alias.toAttribute) &&
alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] &&
alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
.aggregateFunction.isInstanceOf[Min] =>
val min_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
val expr_a = min_a.aggregateFunction.asInstanceOf[Min].child
if (min_a.isDistinct != min_q.isDistinct) {
false
} else {
expr_a.semanticEquals(expr_q)
}
case attr: Attribute if alias_m.contains(attr) &&
alias_m(attr).child.isInstanceOf[AggregateExpression] &&
alias_m(attr).child.asInstanceOf[AggregateExpression]
.aggregateFunction.isInstanceOf[Min] =>
val min_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
val expr_a = min_a.aggregateFunction.asInstanceOf[Min].child
if (min_a.isDistinct != min_q.isDistinct) {
false
} else {
expr_a.semanticEquals(expr_q)
}
case _ => false
}.map { min => min_q.copy(
Min(min.toAttribute),
min_q.mode,
isDistinct = false,
resultId = min_q.resultId)
}.getOrElse { matchable = false; min_q }
case avg_q: AggregateExpression if avg_q.aggregateFunction.isInstanceOf[Average] =>
val expr_q = avg_q.aggregateFunction.children.head
val cnt_q = operator_a.outputList.find {
case alias: Alias if alias_m.contains(alias.toAttribute) &&
alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] &&
alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
.aggregateFunction.isInstanceOf[Count] => // case for group by
val cnt_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
val exprs_a = cnt_a.aggregateFunction.asInstanceOf[Count].children
if (!cnt_a.isDistinct && exprs_a.sameElements(Set(expr_q))) {
true
} else {
false
}
case attr: Attribute if alias_m.contains(attr) &&
alias_m(attr).child.isInstanceOf[AggregateExpression] &&
alias_m(attr).child.asInstanceOf[AggregateExpression]
.aggregateFunction.isInstanceOf[Count] =>
val cnt_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
val exprs_a = cnt_a.aggregateFunction.asInstanceOf[Count].children
if (!cnt_a.isDistinct && exprs_a.sameElements(Set(expr_q))) {
true
} else {
false
}
case _ => false
}.map { cnt => Sum(cnt.toAttribute) }
.getOrElse { matchable = false; NoOp }
val derivative = if (matchable) {
operator_a.outputList.find {
case alias: Alias if alias_m.contains(alias.toAttribute) &&
alias_m(alias.toAttribute).child
.isInstanceOf[AggregateExpression] &&
alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
.aggregateFunction.isInstanceOf[Sum] =>
val sum_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
val expr_a = sum_a.aggregateFunction.asInstanceOf[Sum].child
if (sum_a.isDistinct != avg_q.isDistinct) {
false
} else {
expr_a.semanticEquals(expr_q)
}
case attr: Attribute if alias_m.contains(attr) &&
alias_m(attr).child.isInstanceOf[AggregateExpression] &&
alias_m(attr).child.asInstanceOf[AggregateExpression]
.aggregateFunction.isInstanceOf[Sum] =>
val sum_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
val expr_a = sum_a.aggregateFunction.asInstanceOf[Sum].child
if (sum_a.isDistinct != avg_q.isDistinct) {
false
} else {
expr_a.semanticEquals(expr_q)
}
case alias: Alias if alias_m.contains(alias.toAttribute) &&
alias_m(alias.toAttribute).child
.isInstanceOf[AggregateExpression] &&
alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
.aggregateFunction.isInstanceOf[Average] =>
val avg_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
val expr_a = avg_a.aggregateFunction.asInstanceOf[Average].child
if (avg_a.isDistinct != avg_q.isDistinct) {
false
} else {
expr_a.semanticEquals(expr_q)
}
case attr: Attribute if alias_m.contains(attr) &&
alias_m(attr).child.isInstanceOf[AggregateExpression] &&
alias_m(attr).child.asInstanceOf[AggregateExpression]
.aggregateFunction.isInstanceOf[Average] =>
val avg_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
val expr_a = avg_a.aggregateFunction.asInstanceOf[Average].child
if (avg_a.isDistinct != avg_q.isDistinct) {
false
} else {
expr_a.semanticEquals(expr_q)
}
case _ => false
}.map { sum_or_avg =>
val fun = alias_m(sum_or_avg.toAttribute).child.asInstanceOf[AggregateExpression]
.aggregateFunction
if (fun.isInstanceOf[Sum]) {
val accu = Sum(sum_or_avg.toAttribute)
Divide(accu, Cast(cnt_q, accu.dataType))
} else {
val accu = Sum(Multiply(sum_or_avg.toAttribute, Cast(cnt_q, sum_or_avg.dataType)))
Divide(accu, Cast(cnt_q, accu.dataType))
}
}
} else {
matchable = false
None
}
// If derivative is empty, check if subsumer contains aggregateFunction instance of Average
// function and form an Average expression
if (derivative.isEmpty) {
matchable = true
operator_a.outputList.find {
case alias: Alias if alias_m.contains(alias.toAttribute) &&
alias_m(alias.toAttribute).child
.isInstanceOf[AggregateExpression] &&
alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
.aggregateFunction.isInstanceOf[Average] =>
val avg_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
val expr_a = avg_a.aggregateFunction.asInstanceOf[Average].child
if (avg_a.isDistinct != avg_q.isDistinct) {
false
} else {
expr_a.semanticEquals(expr_q)
}
case attr: Attribute if alias_m.contains(attr) &&
alias_m(attr).child.isInstanceOf[AggregateExpression] &&
alias_m(attr).child.asInstanceOf[AggregateExpression]
.aggregateFunction.isInstanceOf[Average] =>
val avg_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
val expr_a = avg_a.aggregateFunction.asInstanceOf[Average].child
if (avg_a.isDistinct != avg_q.isDistinct) {
false
} else {
expr_a.semanticEquals(expr_q)
}
case _ => false
}.map { avg => avg_q.copy(
Average(avg.toAttribute),
isDistinct = false)
}.getOrElse { matchable = false; avg_q }
} else {
derivative.getOrElse { matchable = false; avg_q }
}
case other: AggregateExpression =>
matchable = false
other
case expr: Expression if !expr.isInstanceOf[AggregateFunction] =>
operator_a.outputList.find {
case alias: Alias if alias_m.contains(alias.toAttribute) &&
(alias_m(alias.toAttribute).child.semanticEquals(expr) ||
isExpressionMatches(alias_m(alias.toAttribute), expr)) &&
!alias_m(alias.toAttribute).child
.isInstanceOf[AggregateExpression] => true
case attr: Attribute if alias_m.contains(attr) &&
alias_m(attr).child.semanticEquals(expr) &&
!alias_m(attr).child.isInstanceOf[AggregateExpression] => true
case _ => false
}.map(_.toAttribute)
.getOrElse { expr }
}
if (matchable) {
Some(matched)
} else {
None
}
}