in gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala [44:92]
private def needsPreProject(plan: SparkPlan): Boolean = {
plan match {
case sort: SortExec =>
sort.sortOrder.exists(o => isNotAttribute(o.child))
case take: TakeOrderedAndProjectExec =>
take.sortOrder.exists(o => isNotAttribute(o.child))
case agg: BaseAggregateExec =>
agg.groupingExpressions.exists(isNotAttribute) ||
agg.aggregateExpressions.exists {
expr =>
if (expr.aggregateFunction.isInstanceOf[TypedAggregateExpression]) {
// We cannot pull out the children of TypedAggregateExpression to pre-project,
// and Gluten cannot support TypedAggregateExpression.
false
} else {
expr.filter.exists(isNotAttribute) ||
(expr.mode match {
case Partial | Complete =>
expr.aggregateFunction.children.exists(isNotAttributeAndLiteral)
case _ => false
})
}
}
case window: WindowExec =>
window.orderSpec.exists(o => isNotAttribute(o.child)) ||
window.partitionSpec.exists(isNotAttribute) ||
window.windowExpression.exists(_.find {
case we: WindowExpression =>
we.windowFunction match {
case windowFunc: WindowFunction =>
windowFunc.children.exists(isNotAttributeAndLiteral)
case ae: AggregateExpression =>
ae.filter.exists(isNotAttribute) ||
ae.aggregateFunction.children.exists(isNotAttributeAndLiteral)
case _ => false
}
case _ => false
}.isDefined) ||
windowNeedPreComputeRangeFrame(window)
case plan if SparkShimLoader.getSparkShims.isWindowGroupLimitExec(plan) =>
val window = SparkShimLoader.getSparkShims
.getWindowGroupLimitExecShim(plan)
.asInstanceOf[WindowGroupLimitExecShim]
window.orderSpec.exists(o => isNotAttribute(o.child)) ||
window.partitionSpec.exists(isNotAttribute)
case expand: ExpandExec => expand.projections.flatten.exists(isNotAttributeAndLiteral)
case _ => false
}
}