def transformDynamicPruningExpr()

in gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala [525:627]


  def transformDynamicPruningExpr(
      partitionFilters: Seq[Expression],
      reuseSubquery: Boolean): Seq[Expression] = {

    def convertBroadcastExchangeToColumnar(
        exchange: BroadcastExchangeExec): ColumnarBroadcastExchangeExec = {
      val newChild = exchange.child match {
        // get WholeStageTransformer directly
        case c2r: ColumnarToRowExecBase => c2r.child
        // in fallback case
        case plan: UnaryExecNode if !PlanUtil.isGlutenColumnarOp(plan) =>
          plan.child match {
            case _: ColumnarToRowExec =>
              val wholeStageTransformer = exchange.find(_.isInstanceOf[WholeStageTransformer])
              wholeStageTransformer.getOrElse(
                BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(plan))
            case _ =>
              BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(plan)
          }
      }
      ColumnarBroadcastExchangeExec(exchange.mode, newChild)
    }

    if (
      GlutenConfig.getConf.enableScanOnly || !GlutenConfig.getConf.enableColumnarBroadcastExchange
    ) {
      // Disable ColumnarSubqueryBroadcast for scan-only execution
      // or ColumnarBroadcastExchange was disabled.
      partitionFilters
    } else {
      val newPartitionFilters = partitionFilters.map {
        case dynamicPruning: DynamicPruningExpression =>
          dynamicPruning.transform {
            // Lookup inside subqueries for duplicate exchanges.
            case in: InSubqueryExec =>
              in.plan match {
                case s: SubqueryBroadcastExec =>
                  val newIn = s
                    .transform {
                      case exchange: BroadcastExchangeExec =>
                        convertBroadcastExchangeToColumnar(exchange)
                    }
                    .asInstanceOf[SubqueryBroadcastExec]
                  val transformSubqueryBroadcast = ColumnarSubqueryBroadcastExec(
                    newIn.name,
                    newIn.index,
                    newIn.buildKeys,
                    newIn.child)

                  // When AQE is on, spark will apply ReuseAdaptiveSubquery rule first,
                  // it will reuse vanilla SubqueryBroadcastExec,
                  // and then use gluten ColumnarOverrides rule to transform Subquery,
                  // so all the SubqueryBroadcastExec in the ReusedSubqueryExec will be transformed
                  // to a new ColumnarSubqueryBroadcastExec for each SubqueryBroadcastExec,
                  // which will lead to execute ColumnarSubqueryBroadcastExec.relationFuture
                  // repeatedly even in the ReusedSubqueryExec.
                  //
                  // On the other hand, it needs to use
                  // the AdaptiveSparkPlanExec.AdaptiveExecutionContext to hold the reused map
                  // for each query.
                  newIn.child match {
                    case a: AdaptiveSparkPlanExec if reuseSubquery =>
                      // When AQE is on and reuseSubquery is on.
                      a.context.subqueryCache
                        .update(newIn.canonicalized, transformSubqueryBroadcast)
                    case _ =>
                  }
                  in.copy(plan = transformSubqueryBroadcast.asInstanceOf[BaseSubqueryExec])
                case r: ReusedSubqueryExec if r.child.isInstanceOf[SubqueryBroadcastExec] =>
                  val newIn = r.child
                    .transform {
                      case exchange: BroadcastExchangeExec =>
                        convertBroadcastExchangeToColumnar(exchange)
                    }
                    .asInstanceOf[SubqueryBroadcastExec]
                  newIn.child match {
                    case a: AdaptiveSparkPlanExec =>
                      // Only when AQE is on, it needs to replace SubqueryBroadcastExec
                      // with reused ColumnarSubqueryBroadcastExec
                      val cachedSubquery = a.context.subqueryCache.get(newIn.canonicalized)
                      if (cachedSubquery.isDefined) {
                        in.copy(plan = ReusedSubqueryExec(cachedSubquery.get))
                      } else {
                        val errMsg = "Can not get the reused ColumnarSubqueryBroadcastExec" +
                          "by the ${newIn.canonicalized}"
                        logWarning(errMsg)
                        throw new UnsupportedOperationException(errMsg)
                      }
                    case _ =>
                      val errMsg = "Can not get the reused ColumnarSubqueryBroadcastExec" +
                        "by the ${newIn.canonicalized}"
                      logWarning(errMsg)
                      throw new UnsupportedOperationException(errMsg)
                  }
                case _ => in
              }
          }
        case e: Expression => e
      }
      updateSubqueryResult(newPartitionFilters)
      newPartitionFilters
    }
  }