override def apply()

in backends-clickhouse/src/main/scala/org/apache/gluten/extension/EliminateDeduplicateAggregateWithAnyJoin.scala [33:93]


  override def apply(plan: SparkPlan): SparkPlan = {

    if (
      !spark.conf
        .get(CHBackendSettings.GLUTEN_ELIMINATE_DEDUPLICATE_AGGREGATE_WITH_ANY_JOIN, "true")
        .toBoolean
    ) {
      return plan
    }

    plan.transformUp {
      case hashJoin: CHShuffledHashJoinExecTransformer
          if (hashJoin.buildSide == BuildRight && hashJoin.joinType == LeftOuter) =>
        hashJoin.right match {
          case aggregate: CHHashAggregateExecTransformer =>
            if (
              isDeduplicateAggregate(aggregate) && allGroupingKeysAreJoinKeys(hashJoin, aggregate)
            ) {
              hashJoin.copy(right = aggregate.child, isAnyJoin = true)
            } else {
              hashJoin
            }
          case project @ ProjectExecTransformer(_, aggregate: CHHashAggregateExecTransformer) =>
            if (
              hashJoin.joinType == LeftOuter &&
              isDeduplicateAggregate(aggregate) &&
              allGroupingKeysAreJoinKeys(hashJoin, aggregate) && project.projectList.forall(
                _.isInstanceOf[AttributeReference])
            ) {
              hashJoin.copy(right = project.copy(child = aggregate.child), isAnyJoin = true)
            } else {
              hashJoin
            }
          case _ => hashJoin
        }
      case hashJoin: CHShuffledHashJoinExecTransformer
          if (hashJoin.buildSide == BuildLeft && hashJoin.joinType == LeftOuter) =>
        hashJoin.left match {
          case aggregate: CHHashAggregateExecTransformer =>
            if (
              isDeduplicateAggregate(aggregate) && allGroupingKeysAreJoinKeys(hashJoin, aggregate)
            ) {
              hashJoin.copy(left = aggregate.child, isAnyJoin = true)
            } else {
              hashJoin
            }
          case project @ ProjectExecTransformer(_, aggregate: CHHashAggregateExecTransformer) =>
            if (
              hashJoin.joinType == LeftOuter &&
              isDeduplicateAggregate(aggregate) &&
              allGroupingKeysAreJoinKeys(hashJoin, aggregate) && project.projectList.forall(
                _.isInstanceOf[AttributeReference])
            ) {
              hashJoin.copy(left = project.copy(child = aggregate.child), isAnyJoin = true)
            } else {
              hashJoin
            }
          case _ => hashJoin
        }
    }
  }