in datafusion/proto/src/physical_plan/to_proto.rs [60:198]
fn try_from(a: Arc<dyn AggregateExpr>) -> Result<Self, Self::Error> {
use datafusion::physical_plan::expressions;
use protobuf::AggregateFunction;
let expressions: Vec<protobuf::PhysicalExprNode> = a
.expressions()
.iter()
.map(|e| e.clone().try_into())
.collect::<Result<Vec<_>>>()?;
let ordering_req: Vec<protobuf::PhysicalSortExprNode> = a
.order_bys()
.unwrap_or(&[])
.iter()
.map(|e| e.clone().try_into())
.collect::<Result<Vec<_>>>()?;
let mut distinct = false;
let aggr_function = if a.as_any().downcast_ref::<Avg>().is_some() {
Ok(AggregateFunction::Avg.into())
} else if a.as_any().downcast_ref::<Sum>().is_some() {
Ok(AggregateFunction::Sum.into())
} else if a.as_any().downcast_ref::<Count>().is_some() {
Ok(AggregateFunction::Count.into())
} else if a.as_any().downcast_ref::<BitAnd>().is_some() {
Ok(AggregateFunction::BitAnd.into())
} else if a.as_any().downcast_ref::<BitOr>().is_some() {
Ok(AggregateFunction::BitOr.into())
} else if a.as_any().downcast_ref::<BitXor>().is_some() {
Ok(AggregateFunction::BitXor.into())
} else if a.as_any().downcast_ref::<BoolAnd>().is_some() {
Ok(AggregateFunction::BoolAnd.into())
} else if a.as_any().downcast_ref::<BoolOr>().is_some() {
Ok(AggregateFunction::BoolOr.into())
} else if a.as_any().downcast_ref::<DistinctCount>().is_some() {
distinct = true;
Ok(AggregateFunction::Count.into())
} else if a.as_any().downcast_ref::<Min>().is_some() {
Ok(AggregateFunction::Min.into())
} else if a.as_any().downcast_ref::<Max>().is_some() {
Ok(AggregateFunction::Max.into())
} else if a
.as_any()
.downcast_ref::<expressions::ApproxDistinct>()
.is_some()
{
Ok(AggregateFunction::ApproxDistinct.into())
} else if a.as_any().downcast_ref::<expressions::ArrayAgg>().is_some() {
Ok(AggregateFunction::ArrayAgg.into())
} else if a.as_any().downcast_ref::<expressions::Variance>().is_some() {
Ok(AggregateFunction::Variance.into())
} else if a
.as_any()
.downcast_ref::<expressions::VariancePop>()
.is_some()
{
Ok(AggregateFunction::VariancePop.into())
} else if a
.as_any()
.downcast_ref::<expressions::Covariance>()
.is_some()
{
Ok(AggregateFunction::Covariance.into())
} else if a
.as_any()
.downcast_ref::<expressions::CovariancePop>()
.is_some()
{
Ok(AggregateFunction::CovariancePop.into())
} else if a.as_any().downcast_ref::<expressions::Stddev>().is_some() {
Ok(AggregateFunction::Stddev.into())
} else if a
.as_any()
.downcast_ref::<expressions::StddevPop>()
.is_some()
{
Ok(AggregateFunction::StddevPop.into())
} else if a
.as_any()
.downcast_ref::<expressions::Correlation>()
.is_some()
{
Ok(AggregateFunction::Correlation.into())
} else if a
.as_any()
.downcast_ref::<expressions::ApproxPercentileCont>()
.is_some()
{
Ok(AggregateFunction::ApproxPercentileCont.into())
} else if a
.as_any()
.downcast_ref::<expressions::ApproxPercentileContWithWeight>()
.is_some()
{
Ok(AggregateFunction::ApproxPercentileContWithWeight.into())
} else if a
.as_any()
.downcast_ref::<expressions::ApproxMedian>()
.is_some()
{
Ok(AggregateFunction::ApproxMedian.into())
} else if a.as_any().is::<expressions::FirstValue>() {
Ok(AggregateFunction::FirstValueAgg.into())
} else if a.as_any().is::<expressions::LastValue>() {
Ok(AggregateFunction::LastValueAgg.into())
} else {
if let Some(a) = a.as_any().downcast_ref::<AggregateFunctionExpr>() {
return Ok(protobuf::PhysicalExprNode {
expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
protobuf::PhysicalAggregateExprNode {
aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(a.fun().name.clone())),
expr: expressions,
ordering_req,
distinct,
},
)),
});
}
Err(DataFusionError::NotImplemented(format!(
"Aggregate function not supported: {a:?}"
)))
}?;
Ok(protobuf::PhysicalExprNode {
expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
protobuf::PhysicalAggregateExprNode {
aggregate_function: Some(
physical_aggregate_expr_node::AggregateFunction::AggrFunction(
aggr_function,
),
),
expr: expressions,
ordering_req,
distinct,
},
)),
})
}