in backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala [711:836]
override def genWindowFunctionsNode(
windowExpression: Seq[NamedExpression],
windowExpressionNodes: JList[WindowFunctionNode],
originalInputAttributes: Seq[Attribute],
context: SubstraitContext): Unit = {
windowExpression.map {
windowExpr =>
val aliasExpr = windowExpr.asInstanceOf[Alias]
val columnName = s"${aliasExpr.name}_${aliasExpr.exprId.id}"
val wExpression = aliasExpr.child.asInstanceOf[WindowExpression]
wExpression.windowFunction match {
case wf @ (RowNumber() | Rank(_) | DenseRank(_) | PercentRank(_)) =>
val aggWindowFunc = wf.asInstanceOf[AggregateWindowFunction]
val frame = aggWindowFunc.frame.asInstanceOf[SpecifiedWindowFrame]
val windowFunctionNode = ExpressionBuilder.makeWindowFunction(
WindowFunctionsBuilder.create(context, aggWindowFunc).toInt,
new JArrayList[ExpressionNode](),
columnName,
ConverterUtils.getTypeNode(aggWindowFunc.dataType, aggWindowFunc.nullable),
frame.upper,
frame.lower,
frame.frameType.sql,
originalInputAttributes.asJava
)
windowExpressionNodes.add(windowFunctionNode)
case aggExpression: AggregateExpression =>
val frame = wExpression.windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
val aggregateFunc = aggExpression.aggregateFunction
val substraitAggFuncName = ExpressionMappings.expressionsMap.get(aggregateFunc.getClass)
if (substraitAggFuncName.isEmpty) {
throw new GlutenNotSupportException(s"Not currently supported: $aggregateFunc.")
}
val childrenNodeList = new JArrayList[ExpressionNode]()
aggregateFunc.children.foreach(
expr =>
childrenNodeList.add(
ExpressionConverter
.replaceWithExpressionTransformer(expr, originalInputAttributes)
.doTransform(context)))
val windowFunctionNode = ExpressionBuilder.makeWindowFunction(
CHExpressions.createAggregateFunction(context, aggExpression.aggregateFunction).toInt,
childrenNodeList,
columnName,
ConverterUtils.getTypeNode(aggExpression.dataType, aggExpression.nullable),
frame.upper,
frame.lower,
frame.frameType.sql,
originalInputAttributes.asJava
)
windowExpressionNodes.add(windowFunctionNode)
case wf @ (Lead(_, _, _, _) | Lag(_, _, _, _)) =>
val (offsetWf, frame) = wf match {
case lead @ Lead(input, offset, default, ignoreNulls) =>
// When the offset value of the lead is negative, will convert to lag function
lead.offset match {
case IntegerLiteral(value) if value < 0 =>
val newWf = Lag(input, Literal(math.abs(value)), default, ignoreNulls)
(newWf, newWf.frame.asInstanceOf[SpecifiedWindowFrame])
case other => (lead, lead.frame.asInstanceOf[SpecifiedWindowFrame])
}
case lag @ Lag(input, offset, default, ignoreNulls) =>
// When the offset value of the lag is negative, will convert to lead function
lag.offset match {
case IntegerLiteral(value) if value > 0 =>
val newWf = Lead(input, Literal(value), default, ignoreNulls)
(newWf, newWf.frame.asInstanceOf[SpecifiedWindowFrame])
case other => (lag, lag.frame.asInstanceOf[SpecifiedWindowFrame])
}
}
val childrenNodeList = new JArrayList[ExpressionNode]()
childrenNodeList.add(
ExpressionConverter
.replaceWithExpressionTransformer(
offsetWf.input,
attributeSeq = originalInputAttributes)
.doTransform(context))
childrenNodeList.add(
ExpressionConverter
.replaceWithExpressionTransformer(
offsetWf.offset,
attributeSeq = originalInputAttributes)
.doTransform(context))
childrenNodeList.add(
ExpressionConverter
.replaceWithExpressionTransformer(
offsetWf.default,
attributeSeq = originalInputAttributes)
.doTransform(context))
val windowFunctionNode = ExpressionBuilder.makeWindowFunction(
WindowFunctionsBuilder.create(context, offsetWf).toInt,
childrenNodeList,
columnName,
ConverterUtils.getTypeNode(offsetWf.dataType, offsetWf.nullable),
frame.upper,
frame.lower,
frame.frameType.sql,
originalInputAttributes.asJava
)
windowExpressionNodes.add(windowFunctionNode)
case wf @ NTile(buckets: Expression) =>
val frame = wExpression.windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
val childrenNodeList = new JArrayList[ExpressionNode]()
val literal = buckets.asInstanceOf[Literal]
childrenNodeList.add(LiteralTransformer(literal).doTransform(context))
val windowFunctionNode = ExpressionBuilder.makeWindowFunction(
WindowFunctionsBuilder.create(context, wf).toInt,
childrenNodeList,
columnName,
ConverterUtils.getTypeNode(wf.dataType, wf.nullable),
frame.upper,
frame.lower,
frame.frameType.sql,
originalInputAttributes.asJava
)
windowExpressionNodes.add(windowFunctionNode)
case _ =>
throw new GlutenNotSupportException(
"unsupported window function type: " +
wExpression.windowFunction)
}
}
}