in integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVMatcher.scala [78:163]
def factorOutSubsumer(
compensation: ModularPlan,
subsumer: Matchable,
aliasMapMain: Map[Int, String]): ModularPlan = {
// Create aliasMap with attribute to alias reference attribute
val aliasMap = AttributeMap(
subsumer.outputList.collect {
case a: Alias if a.child.isInstanceOf[Attribute] =>
(a.child.asInstanceOf[Attribute], a.toAttribute)
})
// Create aliasMap with Expression to alias reference attribute
val aliasMapExp =
subsumer.outputList.collect {
case alias: Alias if alias.child.isInstanceOf[Expression] &&
!alias.child.isInstanceOf[AggregateExpression] =>
alias.child match {
case function: ScalaUDF if function.function.isInstanceOf[TimeSeriesFunction] =>
getTransformedTimeSeriesFunction(function) -> alias.toAttribute
case cast: Cast if cast.child.isInstanceOf[AttributeReference] =>
getTransformedCastOrImplicitCastExpression(cast) -> alias.toAttribute
case implicitCastInputTypeExp: ImplicitCastInputTypes =>
getTransformedCastOrImplicitCastExpression(implicitCastInputTypeExp) ->
alias.toAttribute
case _ =>
alias.child -> alias.toAttribute
}
}.toMap
// Check and replace all alias references with subsumer alias map references.
val compensation1 = compensation.transform {
case plan if !plan.skip && plan != subsumer =>
plan.transformExpressions {
case reference: AttributeReference =>
aliasMap.get(reference).map {
attribute =>
AttributeReference(
attribute.name, attribute.dataType)(
exprId = attribute.exprId,
qualifier = reference.qualifier)
}.getOrElse(reference)
case expression: Expression =>
var attribute = aliasMapExp.get(expression)
// attribute will be empty, if attribute name is of different case. If empty, change
// case of scalaUDF present in expression and get updated expression from aliasMap
if (attribute.isEmpty) {
val newExp = expression transform {
case function: ScalaUDF if function.function.isInstanceOf[TimeSeriesFunction] =>
getTransformedTimeSeriesFunction(function)
case cast: Cast if cast.child.isInstanceOf[AttributeReference] =>
getTransformedCastOrImplicitCastExpression(cast)
case implicitCastInputTypeExp: ImplicitCastInputTypes =>
getTransformedCastOrImplicitCastExpression(implicitCastInputTypeExp)
}
attribute = aliasMapExp.get(newExp)
}
attribute.map {
reference =>
AttributeReference(reference.name, reference.dataType)(exprId = reference.exprId)
}.getOrElse(expression)
}
}
val subqueryAttributeSet = SQLBuilder.collectAttributeSet(subsumer.outputList)
if (SQLBuilder.collectDuplicateNames(subqueryAttributeSet).nonEmpty) {
new UnsupportedOperationException(
s"duplicate name(s): ${ subsumer.output.map(_.toString + ", ") }")
}
if (aliasMapMain.size == 1) {
val subsumerName: Option[String] = aliasMapMain.get(0)
// Replace all compensation1 attributes with refrences of subsumer attribute set
val compensationFinal = compensation1.transformExpressions {
case ref: Attribute if subqueryAttributeSet.contains(ref) =>
CarbonToSparkAdapter.createAttributeReference(
ref.name, ref.dataType, nullable = true, metadata = Metadata.empty,
exprId = ref.exprId, qualifier = subsumerName)
case alias: Alias if subqueryAttributeSet.contains(alias.toAttribute) =>
CarbonToSparkAdapter.createAliasRef(
alias.child, alias.name, alias.exprId, subsumerName)
}
compensationFinal
} else {
compensation1
}
}