in mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala [115:253]
def apply(plan: ModularPlan): ModularPlan = {
plan transform {
case g@GroupBy(_, _, _, _,
s@Select(_, _, _, aliasm, jedges, fact :: dims, _, _, _, _), _, _, _)
if s.adjacencyList.keySet.size <= 1 &&
jedges.forall(e => e.joinType == Inner) && // !s.flags.hasFlag(DISTINCT) &&
fact.isInstanceOf[ModularRelation] &&
(fact :: dims).forall(_.isInstanceOf[modular.LeafNode]) &&
dims.nonEmpty =>
val selAliasMap = AttributeMap(s.outputList.collect {
case a: Alias if a.child.isInstanceOf[Attribute] => (a.toAttribute, a.child
.asInstanceOf[Attribute])
})
val aggTransMap = findPushThroughAggregates(
g.outputList,
selAliasMap,
fact.asInstanceOf[ModularRelation])
val constraintsAttributeSet = dims.flatMap(s.extractEvaluableConditions)
.map(_.references)
.foldLeft(AttributeSet.empty)(_ ++ _)
val groupingAttributeSet = g.predicateList.map(_.references)
.foldLeft(AttributeSet.empty)(_ ++ _)
if (aggTransMap.isEmpty ||
// TODO: the following condition is too pessimistic, more work needed using methods
// similar to those in trait
// QueryPlanConstraints
!constraintsAttributeSet.subsetOf(groupingAttributeSet)) {
g
} else {
val starJExprs = dims.flatMap(dim => s.extractJoinConditions(fact, dim))
val gJAttributes = starJExprs.map(expr => expr.references)
.foldLeft(AttributeSet.empty)(_ ++ _).filter(fact.outputSet.contains(_))
val fExprs = s.extractEvaluableConditions(fact)
val gFAttributes = fExprs.map(expr => expr.references)
.foldLeft(AttributeSet.empty)(_ ++ _)
.filter(fact.outputSet.contains(_))
val gGAttributes = g.predicateList.map(expr => expr.references)
.foldLeft(AttributeSet.empty)(_ ++ _).filter(fact.outputSet.contains(_))
val gAttributes = (gJAttributes ++ gFAttributes ++ gGAttributes).toSeq
val oAggregates = aggTransMap.values.flatMap(_._2).toSeq
val tAliasMap = (aliasm.get(0) match {
case Some(name) => Seq((0, name));
case _ => Seq.empty
}).toMap
val sOutput = (oAggregates.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _) ++
AttributeSet(gAttributes)).toSeq
val hFactSel = plans.modular
.Select(
sOutput,
fact.output,
Seq.empty,
tAliasMap,
Seq.empty,
fact :: Nil,
NoFlags,
Seq.empty,
Seq.empty)
val hFact = plans.modular
.GroupBy(
gAttributes ++ oAggregates,
sOutput,
gAttributes,
None,
hFactSel,
NoFlags,
Seq.empty)
val hFactName = s"gen_harmonized_${
fact.asInstanceOf[ModularRelation]
.databaseName
}_${ fact.asInstanceOf[ModularRelation].tableName }"
val hAliasMap = (aliasm - 0) + (0 -> hFactName)
val hInputList = gAttributes ++ oAggregates.map(_.toAttribute) ++
dims.flatMap(_.asInstanceOf[modular.LeafNode].output)
val attrOutputList = s.outputList.filter(expr => expr.isInstanceOf[Attribute] ||
(expr.isInstanceOf[Alias] &&
expr.asInstanceOf[Alias].child
.isInstanceOf[Attribute]))
val aggOutputList = aggTransMap.values.flatMap(t => t._2)
.map { ref =>
ExpressionHelper.createReference(
ref.name,
ref.dataType,
nullable = true,
Metadata.empty,
ref.exprId, Some(hFactName))
}
val hFactOutputSet = hFact.outputSet
// Update the outputlist qualifier
val hOutputList = (attrOutputList ++ aggOutputList).map {attr =>
attr.transform {
case ref: Attribute if hFactOutputSet.contains(ref) =>
ExpressionHelper.createReference(
ref.name,
ref.dataType,
nullable = true,
Metadata.empty,
ref.exprId,
Some(hFactName))
}
}.asInstanceOf[Seq[NamedExpression]]
// Update the predicate qualifier
val hPredList = s.predicateList.map{ pred =>
pred.transform {
case ref: Attribute if hFactOutputSet.contains(ref) =>
ExpressionHelper.createReference(
ref.name, ref.dataType, nullable = true, Metadata.empty,
ref.exprId, Some(hFactName))
}
}
val hSel = s.copy(
outputList = hOutputList,
inputList = hInputList,
aliasMap = hAliasMap,
predicateList = hPredList,
children = hFact :: dims)
val gOutputList = g.outputList.zipWithIndex
.map { case (expr, index) =>
if (aggTransMap.keySet.contains(index)) {
aggTransMap(index)
._1
} else {
expr
}
}
val wip = g.copy(outputList = gOutputList, inputList = hInputList, child = hSel)
wip.transformExpressions {
case ref: Attribute if hFactOutputSet.contains(ref) =>
ExpressionHelper.createReference(
ref.name, ref.dataType, nullable = true, Metadata.empty,
ref.exprId, Some(hFactName))
}
}
}
}