in gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala [192:353]
def doReplace(p: SparkPlan): SparkPlan = {
val plan = p
if (FallbackTags.nonEmpty(plan)) {
return plan
}
plan match {
case plan: BatchScanExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ScanTransformerFactory.createBatchScanTransformer(plan)
case plan: FileSourceScanExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ScanTransformerFactory.createFileSourceScanTransformer(plan)
case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) =>
// TODO: Add DynamicPartitionPruningHiveScanSuite.scala
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
HiveTableScanExecTransformer(plan)
case plan: CoalesceExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ColumnarCoalesceExec(plan.numPartitions, plan.child)
case plan: FilterExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
BackendsApiManager.getSparkPlanExecApiInstance
.genFilterExecTransformer(plan.condition, plan.child)
case plan: ProjectExec =>
val columnarChild = plan.child
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ProjectExecTransformer(plan.projectList, columnarChild)
case plan: HashAggregateExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
HashAggregateExecBaseTransformer.from(plan)
case plan: SortAggregateExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
HashAggregateExecBaseTransformer.from(plan)
case plan: ObjectHashAggregateExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
HashAggregateExecBaseTransformer.from(plan)
case plan: UnionExec =>
val children = plan.children
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ColumnarUnionExec(children)
case plan: ExpandExec =>
val child = plan.child
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
ExpandExecTransformer(plan.projections, plan.output, child)
case plan: WriteFilesExec =>
val child = plan.child
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
val writeTransformer = WriteFilesExecTransformer(
child,
plan.fileFormat,
plan.partitionColumns,
plan.bucketSpec,
plan.options,
plan.staticPartitions)
ColumnarWriteFilesExec(
writeTransformer,
plan.fileFormat,
plan.partitionColumns,
plan.bucketSpec,
plan.options,
plan.staticPartitions)
case plan: SortExec =>
val child = plan.child
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
SortExecTransformer(plan.sortOrder, plan.global, child, plan.testSpillFrequency)
case plan: TakeOrderedAndProjectExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
val child = plan.child
val (limit, offset) = SparkShimLoader.getSparkShims.getLimitAndOffsetFromTopK(plan)
TakeOrderedAndProjectExecTransformer(
limit,
plan.sortOrder,
plan.projectList,
child,
offset)
case plan: WindowExec =>
WindowExecTransformer(
plan.windowExpression,
plan.partitionSpec,
plan.orderSpec,
plan.child)
case plan if SparkShimLoader.getSparkShims.isWindowGroupLimitExec(plan) =>
val windowGroupLimitPlan = SparkShimLoader.getSparkShims
.getWindowGroupLimitExecShim(plan)
.asInstanceOf[WindowGroupLimitExecShim]
BackendsApiManager.getSparkPlanExecApiInstance.genWindowGroupLimitTransformer(
windowGroupLimitPlan.partitionSpec,
windowGroupLimitPlan.orderSpec,
windowGroupLimitPlan.rankLikeFunction,
windowGroupLimitPlan.limit,
windowGroupLimitPlan.mode,
windowGroupLimitPlan.child
)
case plan: GlobalLimitExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
val child = plan.child
val (limit, offset) =
SparkShimLoader.getSparkShims.getLimitAndOffsetFromGlobalLimit(plan)
LimitExecTransformer(child, offset, limit)
case plan: LocalLimitExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
val child = plan.child
LimitExecTransformer(child, 0L, plan.limit)
case plan: GenerateExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
val child = plan.child
BackendsApiManager.getSparkPlanExecApiInstance.genGenerateTransformer(
plan.generator,
plan.requiredChildOutput,
plan.outer,
plan.generatorOutput,
child)
case plan: BatchEvalPythonExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
val child = plan.child
EvalPythonExecTransformer(plan.udfs, plan.resultAttrs, child)
case plan: ArrowEvalPythonExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
val child = plan.child
// For ArrowEvalPythonExec, CH supports it through EvalPythonExecTransformer while
// Velox backend uses ColumnarArrowEvalPythonExec.
if (
!BackendsApiManager.getSettings.supportColumnarArrowUdf() ||
!GlutenConfig.get.enableColumnarArrowUDF
) {
EvalPythonExecTransformer(plan.udfs, plan.resultAttrs, child)
} else {
BackendsApiManager.getSparkPlanExecApiInstance.createColumnarArrowEvalPythonExec(
plan.udfs,
plan.resultAttrs,
child,
plan.evalType)
}
case plan: RangeExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarRangeExec(
plan.start,
plan.end,
plan.step,
plan.numSlices,
plan.numElements,
plan.output,
plan.children
)
case plan: SampleExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
val child = plan.child
BackendsApiManager.getSparkPlanExecApiInstance.genSampleExecTransformer(
plan.lowerBound,
plan.upperBound,
plan.withReplacement,
plan.seed,
child)
case plan: RDDScanExec if RDDScanTransformer.isSupportRDDScanExec(plan) =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
RDDScanTransformer.getRDDScanTransform(plan)
case p if !p.isInstanceOf[GlutenPlan] =>
logDebug(s"Transformation for ${p.getClass} is currently not supported.")
p
case other => other
}
}