in gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala [525:627]
def transformDynamicPruningExpr(
partitionFilters: Seq[Expression],
reuseSubquery: Boolean): Seq[Expression] = {
def convertBroadcastExchangeToColumnar(
exchange: BroadcastExchangeExec): ColumnarBroadcastExchangeExec = {
val newChild = exchange.child match {
// get WholeStageTransformer directly
case c2r: ColumnarToRowExecBase => c2r.child
// in fallback case
case plan: UnaryExecNode if !PlanUtil.isGlutenColumnarOp(plan) =>
plan.child match {
case _: ColumnarToRowExec =>
val wholeStageTransformer = exchange.find(_.isInstanceOf[WholeStageTransformer])
wholeStageTransformer.getOrElse(
BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(plan))
case _ =>
BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(plan)
}
}
ColumnarBroadcastExchangeExec(exchange.mode, newChild)
}
if (
GlutenConfig.getConf.enableScanOnly || !GlutenConfig.getConf.enableColumnarBroadcastExchange
) {
// Disable ColumnarSubqueryBroadcast for scan-only execution
// or ColumnarBroadcastExchange was disabled.
partitionFilters
} else {
val newPartitionFilters = partitionFilters.map {
case dynamicPruning: DynamicPruningExpression =>
dynamicPruning.transform {
// Lookup inside subqueries for duplicate exchanges.
case in: InSubqueryExec =>
in.plan match {
case s: SubqueryBroadcastExec =>
val newIn = s
.transform {
case exchange: BroadcastExchangeExec =>
convertBroadcastExchangeToColumnar(exchange)
}
.asInstanceOf[SubqueryBroadcastExec]
val transformSubqueryBroadcast = ColumnarSubqueryBroadcastExec(
newIn.name,
newIn.index,
newIn.buildKeys,
newIn.child)
// When AQE is on, spark will apply ReuseAdaptiveSubquery rule first,
// it will reuse vanilla SubqueryBroadcastExec,
// and then use gluten ColumnarOverrides rule to transform Subquery,
// so all the SubqueryBroadcastExec in the ReusedSubqueryExec will be transformed
// to a new ColumnarSubqueryBroadcastExec for each SubqueryBroadcastExec,
// which will lead to execute ColumnarSubqueryBroadcastExec.relationFuture
// repeatedly even in the ReusedSubqueryExec.
//
// On the other hand, it needs to use
// the AdaptiveSparkPlanExec.AdaptiveExecutionContext to hold the reused map
// for each query.
newIn.child match {
case a: AdaptiveSparkPlanExec if reuseSubquery =>
// When AQE is on and reuseSubquery is on.
a.context.subqueryCache
.update(newIn.canonicalized, transformSubqueryBroadcast)
case _ =>
}
in.copy(plan = transformSubqueryBroadcast.asInstanceOf[BaseSubqueryExec])
case r: ReusedSubqueryExec if r.child.isInstanceOf[SubqueryBroadcastExec] =>
val newIn = r.child
.transform {
case exchange: BroadcastExchangeExec =>
convertBroadcastExchangeToColumnar(exchange)
}
.asInstanceOf[SubqueryBroadcastExec]
newIn.child match {
case a: AdaptiveSparkPlanExec =>
// Only when AQE is on, it needs to replace SubqueryBroadcastExec
// with reused ColumnarSubqueryBroadcastExec
val cachedSubquery = a.context.subqueryCache.get(newIn.canonicalized)
if (cachedSubquery.isDefined) {
in.copy(plan = ReusedSubqueryExec(cachedSubquery.get))
} else {
val errMsg = "Can not get the reused ColumnarSubqueryBroadcastExec" +
"by the ${newIn.canonicalized}"
logWarning(errMsg)
throw new UnsupportedOperationException(errMsg)
}
case _ =>
val errMsg = "Can not get the reused ColumnarSubqueryBroadcastExec" +
"by the ${newIn.canonicalized}"
logWarning(errMsg)
throw new UnsupportedOperationException(errMsg)
}
case _ => in
}
}
case e: Expression => e
}
updateSubqueryResult(newPartitionFilters)
newPartitionFilters
}
}