in spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala [348:387]
def aggExprToProto(
aggExpr: AggregateExpression,
inputs: Seq[Attribute],
binding: Boolean,
conf: SQLConf): Option[AggExpr] = {
if (aggExpr.isDistinct) {
// https://github.com/apache/datafusion-comet/issues/1260
withInfo(aggExpr, "distinct aggregates are not supported")
return None
}
val cometExpr: CometAggregateExpressionSerde = aggExpr.aggregateFunction match {
case _: Sum => CometSum
case _: Average => CometAverage
case _: Count => CometCount
case _: Min => CometMin
case _: Max => CometMax
case _: First => CometFirst
case _: Last => CometLast
case _: BitAndAgg => CometBitAndAgg
case _: BitOrAgg => CometBitOrAgg
case _: BitXorAgg => CometBitXOrAgg
case _: CovSample => CometCovSample
case _: CovPopulation => CometCovPopulation
case _: VarianceSamp => CometVarianceSamp
case _: VariancePop => CometVariancePop
case _: StddevSamp => CometStddevSamp
case _: StddevPop => CometStddevPop
case _: Corr => CometCorr
case _: BloomFilterAggregate => CometBloomFilterAggregate
case fn =>
val msg = s"unsupported Spark aggregate function: ${fn.prettyName}"
emitWarning(msg)
withInfo(aggExpr, msg, fn.children: _*)
return None
}
cometExpr.convert(aggExpr, aggExpr.aggregateFunction, inputs, binding, conf)
}