def factorOutSubsumer()

in integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala [78:163]


  def factorOutSubsumer(
      compensation: ModularPlan,
      subsumer: Matchable,
      aliasMapMain: Map[Int, String]): ModularPlan = {

    // Create aliasMap with attribute to alias reference attribute
    val aliasMap = AttributeMap(
      subsumer.outputList.collect {
        case a: Alias if a.child.isInstanceOf[Attribute] =>
          (a.child.asInstanceOf[Attribute], a.toAttribute)
      })

    // Create aliasMap with Expression to alias reference attribute
    val aliasMapExp =
      subsumer.outputList.collect {
        case alias: Alias if alias.child.isInstanceOf[Expression] &&
          !alias.child.isInstanceOf[AggregateExpression] =>
          alias.child match {
            case function: ScalaUDF if function.function.isInstanceOf[TimeSeriesFunction] =>
              getTransformedTimeSeriesFunction(function) -> alias.toAttribute
            case cast: Cast if cast.child.isInstanceOf[AttributeReference] =>
              getTransformedCastOrImplicitCastExpression(cast) -> alias.toAttribute
            case implicitCastInputTypeExp: ImplicitCastInputTypes =>
              getTransformedCastOrImplicitCastExpression(implicitCastInputTypeExp) ->
              alias.toAttribute
            case _ =>
              alias.child -> alias.toAttribute
          }
      }.toMap

    // Check and replace all alias references with subsumer alias map references.
    val compensation1 = compensation.transform {
      case plan if !plan.skip && plan != subsumer =>
        plan.transformExpressions {
          case reference: AttributeReference =>
            aliasMap.get(reference).map {
              attribute =>
                AttributeReference(
                  attribute.name, attribute.dataType)(
                  exprId = attribute.exprId,
                  qualifier = reference.qualifier)
              }.getOrElse(reference)
          case expression: Expression =>
            var attribute = aliasMapExp.get(expression)
            // attribute will be empty, if attribute name is of different case. If empty, change
            // case of scalaUDF present in expression and get updated expression from aliasMap
            if (attribute.isEmpty) {
              val newExp = expression transform {
                case function: ScalaUDF if function.function.isInstanceOf[TimeSeriesFunction] =>
                  getTransformedTimeSeriesFunction(function)
                case cast: Cast if cast.child.isInstanceOf[AttributeReference] =>
                  getTransformedCastOrImplicitCastExpression(cast)
                case implicitCastInputTypeExp: ImplicitCastInputTypes =>
                  getTransformedCastOrImplicitCastExpression(implicitCastInputTypeExp)
              }
              attribute = aliasMapExp.get(newExp)
            }
            attribute.map {
              reference =>
                AttributeReference(reference.name, reference.dataType)(exprId = reference.exprId)
            }.getOrElse(expression)
        }
    }

    val subqueryAttributeSet = SQLBuilder.collectAttributeSet(subsumer.outputList)
    if (SQLBuilder.collectDuplicateNames(subqueryAttributeSet).nonEmpty) {
      new UnsupportedOperationException(
        s"duplicate name(s): ${ subsumer.output.map(_.toString + ", ") }")
    }
    if (aliasMapMain.size == 1) {
      val subsumerName: Option[String] = aliasMapMain.get(0)
      // Replace all compensation1 attributes with refrences of subsumer attribute set
      val compensationFinal = compensation1.transformExpressions {
        case ref: Attribute if subqueryAttributeSet.contains(ref) =>
          CarbonToSparkAdapter.createAttributeReference(
            ref.name, ref.dataType, nullable = true, metadata = Metadata.empty,
            exprId = ref.exprId, qualifier = subsumerName)
        case alias: Alias if subqueryAttributeSet.contains(alias.toAttribute) =>
          CarbonToSparkAdapter.createAliasRef(
            alias.child, alias.name, alias.exprId, subsumerName)
      }
      compensationFinal
    } else {
      compensation1
    }
  }