in backends-clickhouse/src/main/scala/org/apache/gluten/extension/EliminateDeduplicateAggregateWithAnyJoin.scala [33:93]
override def apply(plan: SparkPlan): SparkPlan = {
if (
!spark.conf
.get(CHBackendSettings.GLUTEN_ELIMINATE_DEDUPLICATE_AGGREGATE_WITH_ANY_JOIN, "true")
.toBoolean
) {
return plan
}
plan.transformUp {
case hashJoin: CHShuffledHashJoinExecTransformer
if (hashJoin.buildSide == BuildRight && hashJoin.joinType == LeftOuter) =>
hashJoin.right match {
case aggregate: CHHashAggregateExecTransformer =>
if (
isDeduplicateAggregate(aggregate) && allGroupingKeysAreJoinKeys(hashJoin, aggregate)
) {
hashJoin.copy(right = aggregate.child, isAnyJoin = true)
} else {
hashJoin
}
case project @ ProjectExecTransformer(_, aggregate: CHHashAggregateExecTransformer) =>
if (
hashJoin.joinType == LeftOuter &&
isDeduplicateAggregate(aggregate) &&
allGroupingKeysAreJoinKeys(hashJoin, aggregate) && project.projectList.forall(
_.isInstanceOf[AttributeReference])
) {
hashJoin.copy(right = project.copy(child = aggregate.child), isAnyJoin = true)
} else {
hashJoin
}
case _ => hashJoin
}
case hashJoin: CHShuffledHashJoinExecTransformer
if (hashJoin.buildSide == BuildLeft && hashJoin.joinType == LeftOuter) =>
hashJoin.left match {
case aggregate: CHHashAggregateExecTransformer =>
if (
isDeduplicateAggregate(aggregate) && allGroupingKeysAreJoinKeys(hashJoin, aggregate)
) {
hashJoin.copy(left = aggregate.child, isAnyJoin = true)
} else {
hashJoin
}
case project @ ProjectExecTransformer(_, aggregate: CHHashAggregateExecTransformer) =>
if (
hashJoin.joinType == LeftOuter &&
isDeduplicateAggregate(aggregate) &&
allGroupingKeysAreJoinKeys(hashJoin, aggregate) && project.projectList.forall(
_.isInstanceOf[AttributeReference])
) {
hashJoin.copy(left = project.copy(child = aggregate.child), isAnyJoin = true)
} else {
hashJoin
}
case _ => hashJoin
}
}
}