private def doMatch()

in integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala [305:601]


  private def doMatch(
      operator_a: modular.Matchable,
      operator_q: modular.Matchable,
      alias_m: AttributeMap[Alias]): Option[modular.Matchable] = {
    var matchable = true
    val matched = operator_q.transformExpressions {
      case cnt_q: AggregateExpression if cnt_q.aggregateFunction.isInstanceOf[Count] =>
        val exprs_q = cnt_q.aggregateFunction.children
        operator_a.outputList.find {
          case alias: Alias if alias_m.contains(alias.toAttribute) &&
                               alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] &&
                               alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
                                 .aggregateFunction.isInstanceOf[Count] =>
            // case for group by
            val cnt_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
            val exprs_a = cnt_a.aggregateFunction.asInstanceOf[Count].children
            if (cnt_a.isDistinct != cnt_q.isDistinct || exprs_q.length != exprs_a.length) {
              false
            } else {
              exprs_a.sortBy(_.hashCode()).zip(exprs_q.sortBy(_.hashCode()))
                .forall(p => p._1.semanticEquals(p._2))
            }

          case attr: Attribute if alias_m.contains(attr) &&
                                  alias_m(attr).child.isInstanceOf[AggregateExpression] &&
                                  alias_m(attr).child.asInstanceOf[AggregateExpression]
                                    .aggregateFunction.isInstanceOf[Count] =>
            val cnt_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
            val exprs_a = cnt_a.aggregateFunction.asInstanceOf[Count].children
            if (cnt_a.isDistinct != cnt_q.isDistinct || exprs_q.length != exprs_a.length) {
              false
            } else {
              exprs_a.sortBy(_.hashCode()).zip(exprs_q.sortBy(_.hashCode()))
                .forall(p => p._1.semanticEquals(p._2))
            }

          case _ => false
        }.map { cnt => cnt_q.copy(
          Sum(cnt.toAttribute),
          isDistinct = false)
        }.getOrElse { matchable = false; cnt_q }

      case sum_q: AggregateExpression if sum_q.aggregateFunction.isInstanceOf[Sum] =>
        val expr_q = sum_q.aggregateFunction.children.head
        operator_a.outputList.find {
          case alias: Alias if alias_m.contains(alias.toAttribute) &&
                               alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] &&
                               alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
                                 .aggregateFunction.isInstanceOf[Sum] =>
            val sum_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
            val expr_a = sum_a.aggregateFunction.asInstanceOf[Sum].child
            if (sum_a.isDistinct != sum_q.isDistinct) {
              false
            } else {
              expr_a.semanticEquals(expr_q)
            }

          case attr: Attribute if alias_m.contains(attr) &&
                                  alias_m(attr).child.isInstanceOf[AggregateExpression] &&
                                  alias_m(attr).child.asInstanceOf[AggregateExpression]
                                    .aggregateFunction.isInstanceOf[Sum] =>
            val sum_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
            val expr_a = sum_a.aggregateFunction.asInstanceOf[Sum].child
            if (sum_a.isDistinct != sum_q.isDistinct) {
              false
            } else {
              expr_a.semanticEquals(expr_q)
            }

          case _ => false
        }.map { sum => sum_q.copy(
          Sum(sum.toAttribute),
          isDistinct = false)
        }.getOrElse { matchable = false; sum_q }

      case max_q: AggregateExpression if max_q.aggregateFunction.isInstanceOf[Max] =>
        val expr_q = max_q.aggregateFunction.children.head
        operator_a.outputList.find {
          case alias: Alias if alias_m.contains(alias.toAttribute) &&
                               alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] &&
                               alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
                                 .aggregateFunction.isInstanceOf[Max] =>
            val max_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
            val expr_a = max_a.aggregateFunction.asInstanceOf[Max].child
            if (max_a.isDistinct != max_q.isDistinct) {
              false
            } else {
              expr_a.semanticEquals(expr_q)
            }

          case attr: Attribute if alias_m.contains(attr) &&
                                  alias_m(attr).child.isInstanceOf[AggregateExpression] &&
                                  alias_m(attr).child.asInstanceOf[AggregateExpression]
                                    .aggregateFunction.isInstanceOf[Max] =>
            val max_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
            val expr_a = max_a.aggregateFunction.asInstanceOf[Max].child
            if (max_a.isDistinct != max_q.isDistinct) {
              false
            } else {
              expr_a.semanticEquals(expr_q)
            }

          case _ => false
        }.map { max => max_q.copy(
          Max(max.toAttribute),
          isDistinct = false)
        }.getOrElse { matchable = false; max_q }

      case min_q: AggregateExpression if min_q.aggregateFunction.isInstanceOf[Min] =>
        val expr_q = min_q.aggregateFunction.children.head
        operator_a.outputList.find {
          case alias: Alias if alias_m.contains(alias.toAttribute) &&
                               alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] &&
                               alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
                                 .aggregateFunction.isInstanceOf[Min] =>
            val min_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
            val expr_a = min_a.aggregateFunction.asInstanceOf[Min].child
            if (min_a.isDistinct != min_q.isDistinct) {
              false
            } else {
              expr_a.semanticEquals(expr_q)
            }
          case attr: Attribute if alias_m.contains(attr) &&
                                  alias_m(attr).child.isInstanceOf[AggregateExpression] &&
                                  alias_m(attr).child.asInstanceOf[AggregateExpression]
                                    .aggregateFunction.isInstanceOf[Min] =>
            val min_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
            val expr_a = min_a.aggregateFunction.asInstanceOf[Min].child
            if (min_a.isDistinct != min_q.isDistinct) {
              false
            } else {
              expr_a.semanticEquals(expr_q)
            }
          case _ => false
        }.map { min => min_q.copy(
          Min(min.toAttribute),
          min_q.mode,
          isDistinct = false,
          resultId = min_q.resultId)
        }.getOrElse { matchable = false; min_q }


      case avg_q: AggregateExpression if avg_q.aggregateFunction.isInstanceOf[Average] =>
        val expr_q = avg_q.aggregateFunction.children.head
        val cnt_q = operator_a.outputList.find {
          case alias: Alias if alias_m.contains(alias.toAttribute) &&
                               alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] &&
                               alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
                                 .aggregateFunction.isInstanceOf[Count] => // case for group by
            val cnt_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
            val exprs_a = cnt_a.aggregateFunction.asInstanceOf[Count].children
            if (!cnt_a.isDistinct && exprs_a.sameElements(Set(expr_q))) {
              true
            } else {
              false
            }
          case attr: Attribute if alias_m.contains(attr) &&
                                  alias_m(attr).child.isInstanceOf[AggregateExpression] &&
                                  alias_m(attr).child.asInstanceOf[AggregateExpression]
                                    .aggregateFunction.isInstanceOf[Count] =>
            val cnt_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
            val exprs_a = cnt_a.aggregateFunction.asInstanceOf[Count].children
            if (!cnt_a.isDistinct && exprs_a.sameElements(Set(expr_q))) {
              true
            } else {
              false
            }
          case _ => false
        }.map { cnt => Sum(cnt.toAttribute) }
          .getOrElse { matchable = false; NoOp }

        val derivative = if (matchable) {
          operator_a.outputList.find {
            case alias: Alias if alias_m.contains(alias.toAttribute) &&
                                 alias_m(alias.toAttribute).child
                                   .isInstanceOf[AggregateExpression] &&
                                 alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
                                   .aggregateFunction.isInstanceOf[Sum] =>
              val sum_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
              val expr_a = sum_a.aggregateFunction.asInstanceOf[Sum].child
              if (sum_a.isDistinct != avg_q.isDistinct) {
                false
              } else {
                expr_a.semanticEquals(expr_q)
              }
            case attr: Attribute if alias_m.contains(attr) &&
                                    alias_m(attr).child.isInstanceOf[AggregateExpression] &&
                                    alias_m(attr).child.asInstanceOf[AggregateExpression]
                                      .aggregateFunction.isInstanceOf[Sum] =>
              val sum_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
              val expr_a = sum_a.aggregateFunction.asInstanceOf[Sum].child
              if (sum_a.isDistinct != avg_q.isDistinct) {
                false
              } else {
                expr_a.semanticEquals(expr_q)
              }
            case alias: Alias if alias_m.contains(alias.toAttribute) &&
                                 alias_m(alias.toAttribute).child
                                   .isInstanceOf[AggregateExpression] &&
                                 alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
                                   .aggregateFunction.isInstanceOf[Average] =>
              val avg_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
              val expr_a = avg_a.aggregateFunction.asInstanceOf[Average].child
              if (avg_a.isDistinct != avg_q.isDistinct) {
                false
              } else {
                expr_a.semanticEquals(expr_q)
              }
            case attr: Attribute if alias_m.contains(attr) &&
                                    alias_m(attr).child.isInstanceOf[AggregateExpression] &&
                                    alias_m(attr).child.asInstanceOf[AggregateExpression]
                                      .aggregateFunction.isInstanceOf[Average] =>
              val avg_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
              val expr_a = avg_a.aggregateFunction.asInstanceOf[Average].child
              if (avg_a.isDistinct != avg_q.isDistinct) {
                false
              } else {
                expr_a.semanticEquals(expr_q)
              }
            case _ => false
          }.map { sum_or_avg =>
            val fun = alias_m(sum_or_avg.toAttribute).child.asInstanceOf[AggregateExpression]
              .aggregateFunction
            if (fun.isInstanceOf[Sum]) {
              val accu = Sum(sum_or_avg.toAttribute)
              Divide(accu, Cast(cnt_q, accu.dataType))
            } else {
              val accu = Sum(Multiply(sum_or_avg.toAttribute, Cast(cnt_q, sum_or_avg.dataType)))
              Divide(accu, Cast(cnt_q, accu.dataType))
            }
          }
        } else {
          matchable = false
          None
        }
        // If derivative is empty, check if subsumer contains aggregateFunction instance of Average
        // function and form an Average expression
        if (derivative.isEmpty) {
          matchable = true
          operator_a.outputList.find {
            case alias: Alias if alias_m.contains(alias.toAttribute) &&
                                 alias_m(alias.toAttribute).child
                                   .isInstanceOf[AggregateExpression] &&
                                 alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
                                   .aggregateFunction.isInstanceOf[Average] =>
              val avg_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
              val expr_a = avg_a.aggregateFunction.asInstanceOf[Average].child
              if (avg_a.isDistinct != avg_q.isDistinct) {
                false
              } else {
                expr_a.semanticEquals(expr_q)
              }
            case attr: Attribute if alias_m.contains(attr) &&
                                    alias_m(attr).child.isInstanceOf[AggregateExpression] &&
                                    alias_m(attr).child.asInstanceOf[AggregateExpression]
                                      .aggregateFunction.isInstanceOf[Average] =>
              val avg_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
              val expr_a = avg_a.aggregateFunction.asInstanceOf[Average].child
              if (avg_a.isDistinct != avg_q.isDistinct) {
                false
              } else {
                expr_a.semanticEquals(expr_q)
              }
            case _ => false
          }.map { avg => avg_q.copy(
            Average(avg.toAttribute),
            isDistinct = false)
          }.getOrElse { matchable = false; avg_q }
        } else {
          derivative.getOrElse { matchable = false; avg_q }
        }

      case other: AggregateExpression =>
        matchable = false
        other

      case expr: Expression if !expr.isInstanceOf[AggregateFunction] =>
        operator_a.outputList.find {
          case alias: Alias if alias_m.contains(alias.toAttribute) &&
                              (alias_m(alias.toAttribute).child.semanticEquals(expr) ||
                                isExpressionMatches(alias_m(alias.toAttribute), expr)) &&
                               !alias_m(alias.toAttribute).child
                                 .isInstanceOf[AggregateExpression] => true
          case attr: Attribute if alias_m.contains(attr) &&
                                  alias_m(attr).child.semanticEquals(expr) &&
                                  !alias_m(attr).child.isInstanceOf[AggregateExpression] => true
          case _ => false
        }.map(_.toAttribute)
          .getOrElse { expr }
    }

    if (matchable) {
      Some(matched)
    } else {
      None
    }
  }