fn try_from()

in datafusion/proto/src/physical_plan/to_proto.rs [60:198]


    fn try_from(a: Arc<dyn AggregateExpr>) -> Result<Self, Self::Error> {
        use datafusion::physical_plan::expressions;
        use protobuf::AggregateFunction;

        let expressions: Vec<protobuf::PhysicalExprNode> = a
            .expressions()
            .iter()
            .map(|e| e.clone().try_into())
            .collect::<Result<Vec<_>>>()?;

        let ordering_req: Vec<protobuf::PhysicalSortExprNode> = a
            .order_bys()
            .unwrap_or(&[])
            .iter()
            .map(|e| e.clone().try_into())
            .collect::<Result<Vec<_>>>()?;

        let mut distinct = false;
        let aggr_function = if a.as_any().downcast_ref::<Avg>().is_some() {
            Ok(AggregateFunction::Avg.into())
        } else if a.as_any().downcast_ref::<Sum>().is_some() {
            Ok(AggregateFunction::Sum.into())
        } else if a.as_any().downcast_ref::<Count>().is_some() {
            Ok(AggregateFunction::Count.into())
        } else if a.as_any().downcast_ref::<BitAnd>().is_some() {
            Ok(AggregateFunction::BitAnd.into())
        } else if a.as_any().downcast_ref::<BitOr>().is_some() {
            Ok(AggregateFunction::BitOr.into())
        } else if a.as_any().downcast_ref::<BitXor>().is_some() {
            Ok(AggregateFunction::BitXor.into())
        } else if a.as_any().downcast_ref::<BoolAnd>().is_some() {
            Ok(AggregateFunction::BoolAnd.into())
        } else if a.as_any().downcast_ref::<BoolOr>().is_some() {
            Ok(AggregateFunction::BoolOr.into())
        } else if a.as_any().downcast_ref::<DistinctCount>().is_some() {
            distinct = true;
            Ok(AggregateFunction::Count.into())
        } else if a.as_any().downcast_ref::<Min>().is_some() {
            Ok(AggregateFunction::Min.into())
        } else if a.as_any().downcast_ref::<Max>().is_some() {
            Ok(AggregateFunction::Max.into())
        } else if a
            .as_any()
            .downcast_ref::<expressions::ApproxDistinct>()
            .is_some()
        {
            Ok(AggregateFunction::ApproxDistinct.into())
        } else if a.as_any().downcast_ref::<expressions::ArrayAgg>().is_some() {
            Ok(AggregateFunction::ArrayAgg.into())
        } else if a.as_any().downcast_ref::<expressions::Variance>().is_some() {
            Ok(AggregateFunction::Variance.into())
        } else if a
            .as_any()
            .downcast_ref::<expressions::VariancePop>()
            .is_some()
        {
            Ok(AggregateFunction::VariancePop.into())
        } else if a
            .as_any()
            .downcast_ref::<expressions::Covariance>()
            .is_some()
        {
            Ok(AggregateFunction::Covariance.into())
        } else if a
            .as_any()
            .downcast_ref::<expressions::CovariancePop>()
            .is_some()
        {
            Ok(AggregateFunction::CovariancePop.into())
        } else if a.as_any().downcast_ref::<expressions::Stddev>().is_some() {
            Ok(AggregateFunction::Stddev.into())
        } else if a
            .as_any()
            .downcast_ref::<expressions::StddevPop>()
            .is_some()
        {
            Ok(AggregateFunction::StddevPop.into())
        } else if a
            .as_any()
            .downcast_ref::<expressions::Correlation>()
            .is_some()
        {
            Ok(AggregateFunction::Correlation.into())
        } else if a
            .as_any()
            .downcast_ref::<expressions::ApproxPercentileCont>()
            .is_some()
        {
            Ok(AggregateFunction::ApproxPercentileCont.into())
        } else if a
            .as_any()
            .downcast_ref::<expressions::ApproxPercentileContWithWeight>()
            .is_some()
        {
            Ok(AggregateFunction::ApproxPercentileContWithWeight.into())
        } else if a
            .as_any()
            .downcast_ref::<expressions::ApproxMedian>()
            .is_some()
        {
            Ok(AggregateFunction::ApproxMedian.into())
        } else if a.as_any().is::<expressions::FirstValue>() {
            Ok(AggregateFunction::FirstValueAgg.into())
        } else if a.as_any().is::<expressions::LastValue>() {
            Ok(AggregateFunction::LastValueAgg.into())
        } else {
            if let Some(a) = a.as_any().downcast_ref::<AggregateFunctionExpr>() {
                return Ok(protobuf::PhysicalExprNode {
                    expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
                        protobuf::PhysicalAggregateExprNode {
                            aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(a.fun().name.clone())),
                            expr: expressions,
                            ordering_req,
                            distinct,
                        },
                    )),
                });
            }

            Err(DataFusionError::NotImplemented(format!(
                "Aggregate function not supported: {a:?}"
            )))
        }?;

        Ok(protobuf::PhysicalExprNode {
            expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
                protobuf::PhysicalAggregateExprNode {
                    aggregate_function: Some(
                        physical_aggregate_expr_node::AggregateFunction::AggrFunction(
                            aggr_function,
                        ),
                    ),
                    expr: expressions,
                    ordering_req,
                    distinct,
                },
            )),
        })
    }