in gluten-core/src/main/scala/io/glutenproject/backendsapi/SparkPlanExecApi.scala [400:514]
def joinFallback(
JoinType: JoinType,
leftOutputSet: AttributeSet,
right: AttributeSet,
condition: Option[Expression]): Boolean = false
/** default function to generate window function node */
def genWindowFunctionsNode(
windowExpression: Seq[NamedExpression],
windowExpressionNodes: JList[WindowFunctionNode],
originalInputAttributes: Seq[Attribute],
args: JMap[String, JLong]): Unit = {
windowExpression.map {
windowExpr =>
val aliasExpr = windowExpr.asInstanceOf[Alias]
val columnName = s"${aliasExpr.name}_${aliasExpr.exprId.id}"
val wExpression = aliasExpr.child.asInstanceOf[WindowExpression]
wExpression.windowFunction match {
case wf @ (RowNumber() | Rank(_) | DenseRank(_) | CumeDist() | PercentRank(_)) =>
val aggWindowFunc = wf.asInstanceOf[AggregateWindowFunction]
val frame = aggWindowFunc.frame.asInstanceOf[SpecifiedWindowFrame]
val windowFunctionNode = ExpressionBuilder.makeWindowFunction(
WindowFunctionsBuilder.create(args, aggWindowFunc).toInt,
new JArrayList[ExpressionNode](),
columnName,
ConverterUtils.getTypeNode(aggWindowFunc.dataType, aggWindowFunc.nullable),
WindowExecTransformer.getFrameBound(frame.upper),
WindowExecTransformer.getFrameBound(frame.lower),
frame.frameType.sql
)
windowExpressionNodes.add(windowFunctionNode)
case aggExpression: AggregateExpression =>
val frame = wExpression.windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
val aggregateFunc = aggExpression.aggregateFunction
val substraitAggFuncName = ExpressionMappings.expressionsMap.get(aggregateFunc.getClass)
if (substraitAggFuncName.isEmpty) {
throw new UnsupportedOperationException(s"Not currently supported: $aggregateFunc.")
}
val childrenNodeList = aggregateFunc.children
.map(
ExpressionConverter
.replaceWithExpressionTransformer(_, originalInputAttributes)
.doTransform(args))
.asJava
val windowFunctionNode = ExpressionBuilder.makeWindowFunction(
AggregateFunctionsBuilder.create(args, aggExpression.aggregateFunction).toInt,
childrenNodeList,
columnName,
ConverterUtils.getTypeNode(aggExpression.dataType, aggExpression.nullable),
WindowExecTransformer.getFrameBound(frame.upper),
WindowExecTransformer.getFrameBound(frame.lower),
frame.frameType.sql
)
windowExpressionNodes.add(windowFunctionNode)
case wf @ (Lead(_, _, _, _) | Lag(_, _, _, _)) =>
val offset_wf = wf.asInstanceOf[FrameLessOffsetWindowFunction]
val frame = offset_wf.frame.asInstanceOf[SpecifiedWindowFrame]
val childrenNodeList = new JArrayList[ExpressionNode]()
childrenNodeList.add(
ExpressionConverter
.replaceWithExpressionTransformer(
offset_wf.input,
attributeSeq = originalInputAttributes)
.doTransform(args))
childrenNodeList.add(
ExpressionConverter
.replaceWithExpressionTransformer(
offset_wf.offset,
attributeSeq = originalInputAttributes)
.doTransform(args))
childrenNodeList.add(
ExpressionConverter
.replaceWithExpressionTransformer(
offset_wf.default,
attributeSeq = originalInputAttributes)
.doTransform(args))
val windowFunctionNode = ExpressionBuilder.makeWindowFunction(
WindowFunctionsBuilder.create(args, offset_wf).toInt,
childrenNodeList,
columnName,
ConverterUtils.getTypeNode(offset_wf.dataType, offset_wf.nullable),
WindowExecTransformer.getFrameBound(frame.upper),
WindowExecTransformer.getFrameBound(frame.lower),
frame.frameType.sql
)
windowExpressionNodes.add(windowFunctionNode)
case wf @ NthValue(input, offset: Literal, ignoreNulls: Boolean) =>
val frame = wExpression.windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
val childrenNodeList = new JArrayList[ExpressionNode]()
childrenNodeList.add(
ExpressionConverter
.replaceWithExpressionTransformer(input, attributeSeq = originalInputAttributes)
.doTransform(args))
childrenNodeList.add(LiteralTransformer(offset).doTransform(args))
childrenNodeList.add(LiteralTransformer(Literal(ignoreNulls)).doTransform(args))
val windowFunctionNode = ExpressionBuilder.makeWindowFunction(
WindowFunctionsBuilder.create(args, wf).toInt,
childrenNodeList,
columnName,
ConverterUtils.getTypeNode(wf.dataType, wf.nullable),
frame.upper.sql,
frame.lower.sql,
frame.frameType.sql
)
windowExpressionNodes.add(windowFunctionNode)
case _ =>
throw new UnsupportedOperationException(
"unsupported window function type: " +
wExpression.windowFunction)
}
}
}