in spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala [183:346]
def windowExprToProto(
windowExpr: WindowExpression,
output: Seq[Attribute],
conf: SQLConf): Option[OperatorOuterClass.WindowExpr] = {
val aggregateExpressions: Array[AggregateExpression] = windowExpr.flatMap { expr =>
expr match {
case agg: AggregateExpression =>
agg.aggregateFunction match {
case _: Count =>
Some(agg)
case min: Min =>
if (AggSerde.minMaxDataTypeSupported(min.dataType)) {
Some(agg)
} else {
withInfo(windowExpr, s"datatype ${min.dataType} is not supported", expr)
None
}
case max: Max =>
if (AggSerde.minMaxDataTypeSupported(max.dataType)) {
Some(agg)
} else {
withInfo(windowExpr, s"datatype ${max.dataType} is not supported", expr)
None
}
case s: Sum =>
if (AggSerde.sumDataTypeSupported(s.dataType) && !s.dataType
.isInstanceOf[DecimalType]) {
Some(agg)
} else {
withInfo(windowExpr, s"datatype ${s.dataType} is not supported", expr)
None
}
case _ =>
withInfo(
windowExpr,
s"aggregate ${agg.aggregateFunction}" +
" is not supported for window function",
expr)
None
}
case _ =>
None
}
}.toArray
val (aggExpr, builtinFunc) = if (aggregateExpressions.nonEmpty) {
val modes = aggregateExpressions.map(_.mode).distinct
assert(modes.size == 1 && modes.head == Complete)
(aggExprToProto(aggregateExpressions.head, output, true, conf), None)
} else {
(None, exprToProto(windowExpr.windowFunction, output))
}
if (aggExpr.isEmpty && builtinFunc.isEmpty) {
return None
}
val f = windowExpr.windowSpec.frameSpecification
val (frameType, lowerBound, upperBound) = f match {
case SpecifiedWindowFrame(frameType, lBound, uBound) =>
val frameProto = frameType match {
case RowFrame => OperatorOuterClass.WindowFrameType.Rows
case RangeFrame => OperatorOuterClass.WindowFrameType.Range
}
val lBoundProto = lBound match {
case UnboundedPreceding =>
OperatorOuterClass.LowerWindowFrameBound
.newBuilder()
.setUnboundedPreceding(OperatorOuterClass.UnboundedPreceding.newBuilder().build())
.build()
case CurrentRow =>
OperatorOuterClass.LowerWindowFrameBound
.newBuilder()
.setCurrentRow(OperatorOuterClass.CurrentRow.newBuilder().build())
.build()
case e =>
val offset = e.eval() match {
case i: Integer => i.toLong
case l: Long => l
case _ => return None
}
OperatorOuterClass.LowerWindowFrameBound
.newBuilder()
.setPreceding(
OperatorOuterClass.Preceding
.newBuilder()
.setOffset(offset)
.build())
.build()
}
val uBoundProto = uBound match {
case UnboundedFollowing =>
OperatorOuterClass.UpperWindowFrameBound
.newBuilder()
.setUnboundedFollowing(OperatorOuterClass.UnboundedFollowing.newBuilder().build())
.build()
case CurrentRow =>
OperatorOuterClass.UpperWindowFrameBound
.newBuilder()
.setCurrentRow(OperatorOuterClass.CurrentRow.newBuilder().build())
.build()
case e =>
val offset = e.eval() match {
case i: Integer => i.toLong
case l: Long => l
case _ => return None
}
OperatorOuterClass.UpperWindowFrameBound
.newBuilder()
.setFollowing(
OperatorOuterClass.Following
.newBuilder()
.setOffset(offset)
.build())
.build()
}
(frameProto, lBoundProto, uBoundProto)
case _ =>
(
OperatorOuterClass.WindowFrameType.Rows,
OperatorOuterClass.LowerWindowFrameBound
.newBuilder()
.setUnboundedPreceding(OperatorOuterClass.UnboundedPreceding.newBuilder().build())
.build(),
OperatorOuterClass.UpperWindowFrameBound
.newBuilder()
.setUnboundedFollowing(OperatorOuterClass.UnboundedFollowing.newBuilder().build())
.build())
}
val frame = OperatorOuterClass.WindowFrame
.newBuilder()
.setFrameType(frameType)
.setLowerBound(lowerBound)
.setUpperBound(upperBound)
.build()
val spec =
OperatorOuterClass.WindowSpecDefinition.newBuilder().setFrameSpecification(frame).build()
if (builtinFunc.isDefined) {
Some(
OperatorOuterClass.WindowExpr
.newBuilder()
.setBuiltInWindowFunction(builtinFunc.get)
.setSpec(spec)
.build())
} else if (aggExpr.isDefined) {
Some(
OperatorOuterClass.WindowExpr
.newBuilder()
.setAggFunc(aggExpr.get)
.setSpec(spec)
.build())
} else {
None
}
}