in gluten-core/src/main/scala/io/glutenproject/extension/ColumnarOverrides.scala [282:528]
def replaceWithTransformerPlan(plan: SparkPlan): SparkPlan = {
TransformHints.getHint(plan) match {
case _: TRANSFORM_SUPPORTED =>
// supported, break
case _: TRANSFORM_UNSUPPORTED =>
logDebug(s"Columnar Processing for ${plan.getClass} is under row guard.")
plan match {
case shj: ShuffledHashJoinExec =>
if (BackendsApiManager.getSettings.recreateJoinExecOnFallback()) {
// Because we manually removed the build side limitation for LeftOuter, LeftSemi and
// RightOuter, need to change the build side back if this join fallback into vanilla
// Spark for execution.
return ShuffledHashJoinExec(
shj.leftKeys,
shj.rightKeys,
shj.joinType,
getSparkSupportedBuildSide(shj),
shj.condition,
replaceWithTransformerPlan(shj.left),
replaceWithTransformerPlan(shj.right),
shj.isSkewJoin
)
} else {
return shj.withNewChildren(shj.children.map(replaceWithTransformerPlan))
}
case plan: BatchScanExec =>
return applyScanNotTransformable(plan)
case plan: FileSourceScanExec =>
return applyScanNotTransformable(plan)
case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) =>
return applyScanNotTransformable(plan)
case p =>
return p.withNewChildren(p.children.map(replaceWithTransformerPlan))
}
}
plan match {
case plan: BatchScanExec =>
applyScanTransformer(plan)
case plan: FileSourceScanExec =>
applyScanTransformer(plan)
case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) =>
applyScanTransformer(plan)
case plan: CoalesceExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
CoalesceExecTransformer(plan.numPartitions, replaceWithTransformerPlan(plan.child))
case plan: ProjectExec =>
val columnarChild = replaceWithTransformerPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ProjectExecTransformer(plan.projectList, columnarChild)
case plan: FilterExec =>
genFilterExec(plan)
case plan: HashAggregateExec =>
genHashAggregateExec(plan)
case plan: SortAggregateExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
BackendsApiManager.getSparkPlanExecApiInstance
.genHashAggregateExecTransformer(
plan.requiredChildDistributionExpressions,
plan.groupingExpressions,
plan.aggregateExpressions,
plan.aggregateAttributes,
plan.initialInputBufferOffset,
plan.resultExpressions,
plan.child match {
case sort: SortExecTransformer if !sort.global =>
replaceWithTransformerPlan(sort.child)
case sort: SortExec if !sort.global =>
replaceWithTransformerPlan(sort.child)
case _ => replaceWithTransformerPlan(plan.child)
}
)
case plan: ObjectHashAggregateExec =>
val child = replaceWithTransformerPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
BackendsApiManager.getSparkPlanExecApiInstance
.genHashAggregateExecTransformer(
plan.requiredChildDistributionExpressions,
plan.groupingExpressions,
plan.aggregateExpressions,
plan.aggregateAttributes,
plan.initialInputBufferOffset,
plan.resultExpressions,
child
)
case plan: UnionExec =>
val children = plan.children.map(replaceWithTransformerPlan)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ColumnarUnionExec(children)
case plan: ExpandExec =>
val child = replaceWithTransformerPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ExpandExecTransformer(plan.projections, plan.output, child)
case plan: WriteFilesExec =>
val child = replaceWithTransformerPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
val writeTransformer = WriteFilesExecTransformer(
child,
plan.fileFormat,
plan.partitionColumns,
plan.bucketSpec,
plan.options,
plan.staticPartitions)
BackendsApiManager.getSparkPlanExecApiInstance.createColumnarWriteFilesExec(
writeTransformer,
plan.fileFormat,
plan.partitionColumns,
plan.bucketSpec,
plan.options,
plan.staticPartitions
)
case plan: SortExec =>
val child = replaceWithTransformerPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
SortExecTransformer(plan.sortOrder, plan.global, child, plan.testSpillFrequency)
case plan: TakeOrderedAndProjectExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
val child = replaceWithTransformerPlan(plan.child)
TakeOrderedAndProjectExecTransformer(plan.limit, plan.sortOrder, plan.projectList, child)
case plan: ShuffleExchangeExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
val child = replaceWithTransformerPlan(plan.child)
if (
(child.supportsColumnar || columnarConf.enablePreferColumnar) &&
BackendsApiManager.getSettings.supportColumnarShuffleExec()
) {
if (BackendsApiManager.getSettings.removeHashColumnFromColumnarShuffleExchangeExec()) {
plan.outputPartitioning match {
case HashPartitioning(exprs, _) =>
val projectChild = getProjectWithHash(exprs, child)
if (projectChild.supportsColumnar) {
ColumnarShuffleExchangeExec(plan, projectChild, projectChild.output.drop(1))
} else {
plan.withNewChildren(Seq(child))
}
case _ =>
ColumnarShuffleExchangeExec(plan, child, null)
}
} else if (
BackendsApiManager.getSettings.supportShuffleWithProject(
plan.outputPartitioning,
plan.child)
) {
val (projectColumnNumber, newPartitioning, newChild) =
addProjectionForShuffleExchange(plan)
if (projectColumnNumber != 0) {
if (newChild.supportsColumnar) {
val newPlan = ShuffleExchangeExec(newPartitioning, newChild, plan.shuffleOrigin)
// the new projections columns are appended at the end.
ColumnarShuffleExchangeExec(
newPlan,
newChild,
newChild.output.dropRight(projectColumnNumber))
} else {
// It's the case that partitioning expressions could be offloaded into native.
plan.withNewChildren(Seq(child))
}
} else {
ColumnarShuffleExchangeExec(plan, child, null)
}
} else {
ColumnarShuffleExchangeExec(plan, child, null)
}
} else {
plan.withNewChildren(Seq(child))
}
case plan: ShuffledHashJoinExec =>
val left = replaceWithTransformerPlan(plan.left)
val right = replaceWithTransformerPlan(plan.right)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
BackendsApiManager.getSparkPlanExecApiInstance
.genShuffledHashJoinExecTransformer(
plan.leftKeys,
plan.rightKeys,
plan.joinType,
plan.buildSide,
plan.condition,
left,
right,
plan.isSkewJoin)
case plan: SortMergeJoinExec =>
val left = replaceWithTransformerPlan(plan.left)
val right = replaceWithTransformerPlan(plan.right)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
SortMergeJoinExecTransformer(
plan.leftKeys,
plan.rightKeys,
plan.joinType,
plan.condition,
left,
right,
plan.isSkewJoin)
case plan: BroadcastExchangeExec =>
val child = replaceWithTransformerPlan(plan.child)
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ColumnarBroadcastExchangeExec(plan.mode, child)
case plan: BroadcastHashJoinExec =>
val left = replaceWithTransformerPlan(plan.left)
val right = replaceWithTransformerPlan(plan.right)
BackendsApiManager.getSparkPlanExecApiInstance
.genBroadcastHashJoinExecTransformer(
plan.leftKeys,
plan.rightKeys,
plan.joinType,
plan.buildSide,
plan.condition,
left,
right,
isNullAwareAntiJoin = plan.isNullAwareAntiJoin)
case plan: CartesianProductExec =>
val left = replaceWithTransformerPlan(plan.left)
val right = replaceWithTransformerPlan(plan.right)
BackendsApiManager.getSparkPlanExecApiInstance
.genCartesianProductExecTransformer(left, right, plan.condition)
case plan: WindowExec =>
WindowExecTransformer(
plan.windowExpression,
plan.partitionSpec,
plan.orderSpec,
replaceWithTransformerPlan(plan.child))
case plan: GlobalLimitExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
val child = replaceWithTransformerPlan(plan.child)
LimitTransformer(child, 0L, plan.limit)
case plan: LocalLimitExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
val child = replaceWithTransformerPlan(plan.child)
LimitTransformer(child, 0L, plan.limit)
case plan: GenerateExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
val child = replaceWithTransformerPlan(plan.child)
GenerateExecTransformer(
plan.generator,
plan.requiredChildOutput,
plan.outer,
plan.generatorOutput,
child)
case plan: EvalPythonExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
val child = replaceWithTransformerPlan(plan.child)
EvalPythonExecTransformer(plan.udfs, plan.resultAttrs, child)
case p =>
logDebug(s"Transformation for ${p.getClass} is currently not supported.")
val children = plan.children.map(replaceWithTransformerPlan)
p.withNewChildren(children)
}
}