fn try_from()

in datafusion/proto/src/logical_plan/to_proto.rs [476:1067]


    fn try_from(expr: &Expr) -> Result<Self, Self::Error> {
        use protobuf::logical_expr_node::ExprType;

        let expr_node = match expr {
            Expr::Column(c) => Self {
                expr_type: Some(ExprType::Column(c.into())),
            },
            Expr::Alias(Alias { expr, name, .. }) => {
                let alias = Box::new(protobuf::AliasNode {
                    expr: Some(Box::new(expr.as_ref().try_into()?)),
                    alias: name.to_owned(),
                });
                Self {
                    expr_type: Some(ExprType::Alias(alias)),
                }
            }
            Expr::Literal(value) => {
                let pb_value: protobuf::ScalarValue = value.try_into()?;
                Self {
                    expr_type: Some(ExprType::Literal(pb_value)),
                }
            }
            Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
                // Try to linerize a nested binary expression tree of the same operator
                // into a flat vector of expressions.
                let mut exprs = vec![right.as_ref()];
                let mut current_expr = left.as_ref();
                while let Expr::BinaryExpr(BinaryExpr {
                    left,
                    op: current_op,
                    right,
                }) = current_expr
                {
                    if current_op == op {
                        exprs.push(right.as_ref());
                        current_expr = left.as_ref();
                    } else {
                        break;
                    }
                }
                exprs.push(current_expr);

                let binary_expr = protobuf::BinaryExprNode {
                    // We need to reverse exprs since operands are expected to be
                    // linearized from left innermost to right outermost (but while
                    // traversing the chain we do the exact opposite).
                    operands: exprs
                        .into_iter()
                        .rev()
                        .map(|expr| expr.try_into())
                        .collect::<Result<Vec<_>, Error>>()?,
                    op: format!("{op:?}"),
                };
                Self {
                    expr_type: Some(ExprType::BinaryExpr(binary_expr)),
                }
            }
            Expr::Like(Like {
                negated,
                expr,
                pattern,
                escape_char,
                case_insensitive,
            }) => {
                if *case_insensitive {
                    let pb = Box::new(protobuf::ILikeNode {
                        negated: *negated,
                        expr: Some(Box::new(expr.as_ref().try_into()?)),
                        pattern: Some(Box::new(pattern.as_ref().try_into()?)),
                        escape_char: escape_char
                            .map(|ch| ch.to_string())
                            .unwrap_or_default(),
                    });

                    Self {
                        expr_type: Some(ExprType::Ilike(pb)),
                    }
                } else {
                    let pb = Box::new(protobuf::LikeNode {
                        negated: *negated,
                        expr: Some(Box::new(expr.as_ref().try_into()?)),
                        pattern: Some(Box::new(pattern.as_ref().try_into()?)),
                        escape_char: escape_char
                            .map(|ch| ch.to_string())
                            .unwrap_or_default(),
                    });

                    Self {
                        expr_type: Some(ExprType::Like(pb)),
                    }
                }
            }
            Expr::SimilarTo(Like {
                negated,
                expr,
                pattern,
                escape_char,
                case_insensitive: _,
            }) => {
                let pb = Box::new(protobuf::SimilarToNode {
                    negated: *negated,
                    expr: Some(Box::new(expr.as_ref().try_into()?)),
                    pattern: Some(Box::new(pattern.as_ref().try_into()?)),
                    escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
                });
                Self {
                    expr_type: Some(ExprType::SimilarTo(pb)),
                }
            }
            Expr::WindowFunction(expr::WindowFunction {
                ref fun,
                ref args,
                ref partition_by,
                ref order_by,
                ref window_frame,
            }) => {
                let window_function = match fun {
                    WindowFunction::AggregateFunction(fun) => {
                        protobuf::window_expr_node::WindowFunction::AggrFunction(
                            protobuf::AggregateFunction::from(fun).into(),
                        )
                    }
                    WindowFunction::BuiltInWindowFunction(fun) => {
                        protobuf::window_expr_node::WindowFunction::BuiltInFunction(
                            protobuf::BuiltInWindowFunction::from(fun).into(),
                        )
                    }
                    WindowFunction::AggregateUDF(aggr_udf) => {
                        protobuf::window_expr_node::WindowFunction::Udaf(
                            aggr_udf.name.clone(),
                        )
                    }
                    WindowFunction::WindowUDF(window_udf) => {
                        protobuf::window_expr_node::WindowFunction::Udwf(
                            window_udf.name.clone(),
                        )
                    }
                };
                let arg_expr: Option<Box<Self>> = if !args.is_empty() {
                    let arg = &args[0];
                    Some(Box::new(arg.try_into()?))
                } else {
                    None
                };
                let partition_by = partition_by
                    .iter()
                    .map(|e| e.try_into())
                    .collect::<Result<Vec<_>, _>>()?;
                let order_by = order_by
                    .iter()
                    .map(|e| e.try_into())
                    .collect::<Result<Vec<_>, _>>()?;

                let window_frame: Option<protobuf::WindowFrame> =
                    Some(window_frame.try_into()?);
                let window_expr = Box::new(protobuf::WindowExprNode {
                    expr: arg_expr,
                    window_function: Some(window_function),
                    partition_by,
                    order_by,
                    window_frame,
                });
                Self {
                    expr_type: Some(ExprType::WindowExpr(window_expr)),
                }
            }
            Expr::AggregateFunction(expr::AggregateFunction {
                ref fun,
                ref args,
                ref distinct,
                ref filter,
                ref order_by,
            }) => {
                let aggr_function = match fun {
                    AggregateFunction::ApproxDistinct => {
                        protobuf::AggregateFunction::ApproxDistinct
                    }
                    AggregateFunction::ApproxPercentileCont => {
                        protobuf::AggregateFunction::ApproxPercentileCont
                    }
                    AggregateFunction::ApproxPercentileContWithWeight => {
                        protobuf::AggregateFunction::ApproxPercentileContWithWeight
                    }
                    AggregateFunction::ArrayAgg => protobuf::AggregateFunction::ArrayAgg,
                    AggregateFunction::Min => protobuf::AggregateFunction::Min,
                    AggregateFunction::Max => protobuf::AggregateFunction::Max,
                    AggregateFunction::Sum => protobuf::AggregateFunction::Sum,
                    AggregateFunction::BitAnd => protobuf::AggregateFunction::BitAnd,
                    AggregateFunction::BitOr => protobuf::AggregateFunction::BitOr,
                    AggregateFunction::BitXor => protobuf::AggregateFunction::BitXor,
                    AggregateFunction::BoolAnd => protobuf::AggregateFunction::BoolAnd,
                    AggregateFunction::BoolOr => protobuf::AggregateFunction::BoolOr,
                    AggregateFunction::Avg => protobuf::AggregateFunction::Avg,
                    AggregateFunction::Count => protobuf::AggregateFunction::Count,
                    AggregateFunction::Variance => protobuf::AggregateFunction::Variance,
                    AggregateFunction::VariancePop => {
                        protobuf::AggregateFunction::VariancePop
                    }
                    AggregateFunction::Covariance => {
                        protobuf::AggregateFunction::Covariance
                    }
                    AggregateFunction::CovariancePop => {
                        protobuf::AggregateFunction::CovariancePop
                    }
                    AggregateFunction::Stddev => protobuf::AggregateFunction::Stddev,
                    AggregateFunction::StddevPop => {
                        protobuf::AggregateFunction::StddevPop
                    }
                    AggregateFunction::Correlation => {
                        protobuf::AggregateFunction::Correlation
                    }
                    AggregateFunction::RegrSlope => {
                        protobuf::AggregateFunction::RegrSlope
                    }
                    AggregateFunction::RegrIntercept => {
                        protobuf::AggregateFunction::RegrIntercept
                    }
                    AggregateFunction::RegrR2 => protobuf::AggregateFunction::RegrR2,
                    AggregateFunction::RegrAvgx => protobuf::AggregateFunction::RegrAvgx,
                    AggregateFunction::RegrAvgy => protobuf::AggregateFunction::RegrAvgy,
                    AggregateFunction::RegrCount => {
                        protobuf::AggregateFunction::RegrCount
                    }
                    AggregateFunction::RegrSXX => protobuf::AggregateFunction::RegrSxx,
                    AggregateFunction::RegrSYY => protobuf::AggregateFunction::RegrSyy,
                    AggregateFunction::RegrSXY => protobuf::AggregateFunction::RegrSxy,
                    AggregateFunction::ApproxMedian => {
                        protobuf::AggregateFunction::ApproxMedian
                    }
                    AggregateFunction::Grouping => protobuf::AggregateFunction::Grouping,
                    AggregateFunction::Median => protobuf::AggregateFunction::Median,
                    AggregateFunction::FirstValue => {
                        protobuf::AggregateFunction::FirstValueAgg
                    }
                    AggregateFunction::LastValue => {
                        protobuf::AggregateFunction::LastValueAgg
                    }
                };

                let aggregate_expr = protobuf::AggregateExprNode {
                    aggr_function: aggr_function.into(),
                    expr: args
                        .iter()
                        .map(|v| v.try_into())
                        .collect::<Result<Vec<_>, _>>()?,
                    distinct: *distinct,
                    filter: match filter {
                        Some(e) => Some(Box::new(e.as_ref().try_into()?)),
                        None => None,
                    },
                    order_by: match order_by {
                        Some(e) => e
                            .iter()
                            .map(|expr| expr.try_into())
                            .collect::<Result<Vec<_>, _>>()?,
                        None => vec![],
                    },
                };
                Self {
                    expr_type: Some(ExprType::AggregateExpr(Box::new(aggregate_expr))),
                }
            }
            Expr::ScalarVariable(_, _) => {
                return Err(Error::General(
                    "Proto serialization error: Scalar Variable not supported"
                        .to_string(),
                ))
            }
            Expr::ScalarFunction(ScalarFunction { fun, args }) => {
                let fun: protobuf::ScalarFunction = fun.try_into()?;
                let args: Vec<Self> = args
                    .iter()
                    .map(|e| e.try_into())
                    .collect::<Result<Vec<Self>, Error>>()?;
                Self {
                    expr_type: Some(ExprType::ScalarFunction(
                        protobuf::ScalarFunctionNode {
                            fun: fun.into(),
                            args,
                        },
                    )),
                }
            }
            Expr::ScalarUDF(ScalarUDF { fun, args }) => Self {
                expr_type: Some(ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
                    fun_name: fun.name.clone(),
                    args: args
                        .iter()
                        .map(|expr| expr.try_into())
                        .collect::<Result<Vec<_>, Error>>()?,
                })),
            },
            Expr::AggregateUDF(expr::AggregateUDF {
                fun,
                args,
                filter,
                order_by,
            }) => Self {
                expr_type: Some(ExprType::AggregateUdfExpr(Box::new(
                    protobuf::AggregateUdfExprNode {
                        fun_name: fun.name.clone(),
                        args: args.iter().map(|expr| expr.try_into()).collect::<Result<
                            Vec<_>,
                            Error,
                        >>(
                        )?,
                        filter: match filter {
                            Some(e) => Some(Box::new(e.as_ref().try_into()?)),
                            None => None,
                        },
                        order_by: match order_by {
                            Some(e) => e
                                .iter()
                                .map(|expr| expr.try_into())
                                .collect::<Result<Vec<_>, _>>()?,
                            None => vec![],
                        },
                    },
                ))),
            },
            Expr::Not(expr) => {
                let expr = Box::new(protobuf::Not {
                    expr: Some(Box::new(expr.as_ref().try_into()?)),
                });
                Self {
                    expr_type: Some(ExprType::NotExpr(expr)),
                }
            }
            Expr::IsNull(expr) => {
                let expr = Box::new(protobuf::IsNull {
                    expr: Some(Box::new(expr.as_ref().try_into()?)),
                });
                Self {
                    expr_type: Some(ExprType::IsNullExpr(expr)),
                }
            }
            Expr::IsNotNull(expr) => {
                let expr = Box::new(protobuf::IsNotNull {
                    expr: Some(Box::new(expr.as_ref().try_into()?)),
                });
                Self {
                    expr_type: Some(ExprType::IsNotNullExpr(expr)),
                }
            }
            Expr::IsTrue(expr) => {
                let expr = Box::new(protobuf::IsTrue {
                    expr: Some(Box::new(expr.as_ref().try_into()?)),
                });
                Self {
                    expr_type: Some(ExprType::IsTrue(expr)),
                }
            }
            Expr::IsFalse(expr) => {
                let expr = Box::new(protobuf::IsFalse {
                    expr: Some(Box::new(expr.as_ref().try_into()?)),
                });
                Self {
                    expr_type: Some(ExprType::IsFalse(expr)),
                }
            }
            Expr::IsUnknown(expr) => {
                let expr = Box::new(protobuf::IsUnknown {
                    expr: Some(Box::new(expr.as_ref().try_into()?)),
                });
                Self {
                    expr_type: Some(ExprType::IsUnknown(expr)),
                }
            }
            Expr::IsNotTrue(expr) => {
                let expr = Box::new(protobuf::IsNotTrue {
                    expr: Some(Box::new(expr.as_ref().try_into()?)),
                });
                Self {
                    expr_type: Some(ExprType::IsNotTrue(expr)),
                }
            }
            Expr::IsNotFalse(expr) => {
                let expr = Box::new(protobuf::IsNotFalse {
                    expr: Some(Box::new(expr.as_ref().try_into()?)),
                });
                Self {
                    expr_type: Some(ExprType::IsNotFalse(expr)),
                }
            }
            Expr::IsNotUnknown(expr) => {
                let expr = Box::new(protobuf::IsNotUnknown {
                    expr: Some(Box::new(expr.as_ref().try_into()?)),
                });
                Self {
                    expr_type: Some(ExprType::IsNotUnknown(expr)),
                }
            }
            Expr::Between(Between {
                expr,
                negated,
                low,
                high,
            }) => {
                let expr = Box::new(protobuf::BetweenNode {
                    expr: Some(Box::new(expr.as_ref().try_into()?)),
                    negated: *negated,
                    low: Some(Box::new(low.as_ref().try_into()?)),
                    high: Some(Box::new(high.as_ref().try_into()?)),
                });
                Self {
                    expr_type: Some(ExprType::Between(expr)),
                }
            }
            Expr::Case(case) => {
                let when_then_expr = case
                    .when_then_expr
                    .iter()
                    .map(|(w, t)| {
                        Ok(protobuf::WhenThen {
                            when_expr: Some(w.as_ref().try_into()?),
                            then_expr: Some(t.as_ref().try_into()?),
                        })
                    })
                    .collect::<Result<Vec<protobuf::WhenThen>, Error>>()?;
                let expr = Box::new(protobuf::CaseNode {
                    expr: match &case.expr {
                        Some(e) => Some(Box::new(e.as_ref().try_into()?)),
                        None => None,
                    },
                    when_then_expr,
                    else_expr: match &case.else_expr {
                        Some(e) => Some(Box::new(e.as_ref().try_into()?)),
                        None => None,
                    },
                });
                Self {
                    expr_type: Some(ExprType::Case(expr)),
                }
            }
            Expr::Cast(Cast { expr, data_type }) => {
                let expr = Box::new(protobuf::CastNode {
                    expr: Some(Box::new(expr.as_ref().try_into()?)),
                    arrow_type: Some(data_type.try_into()?),
                });
                Self {
                    expr_type: Some(ExprType::Cast(expr)),
                }
            }
            Expr::TryCast(TryCast { expr, data_type }) => {
                let expr = Box::new(protobuf::TryCastNode {
                    expr: Some(Box::new(expr.as_ref().try_into()?)),
                    arrow_type: Some(data_type.try_into()?),
                });
                Self {
                    expr_type: Some(ExprType::TryCast(expr)),
                }
            }
            Expr::Sort(Sort {
                expr,
                asc,
                nulls_first,
            }) => {
                let expr = Box::new(protobuf::SortExprNode {
                    expr: Some(Box::new(expr.as_ref().try_into()?)),
                    asc: *asc,
                    nulls_first: *nulls_first,
                });
                Self {
                    expr_type: Some(ExprType::Sort(expr)),
                }
            }
            Expr::Negative(expr) => {
                let expr = Box::new(protobuf::NegativeNode {
                    expr: Some(Box::new(expr.as_ref().try_into()?)),
                });
                Self {
                    expr_type: Some(ExprType::Negative(expr)),
                }
            }
            Expr::InList(InList {
                expr,
                list,
                negated,
            }) => {
                let expr = Box::new(protobuf::InListNode {
                    expr: Some(Box::new(expr.as_ref().try_into()?)),
                    list: list
                        .iter()
                        .map(|expr| expr.try_into())
                        .collect::<Result<Vec<_>, Error>>()?,
                    negated: *negated,
                });
                Self {
                    expr_type: Some(ExprType::InList(expr)),
                }
            }
            Expr::Wildcard => Self {
                expr_type: Some(ExprType::Wildcard(true)),
            },
            Expr::ScalarSubquery(_)
            | Expr::InSubquery(_)
            | Expr::Exists { .. }
            | Expr::OuterReferenceColumn { .. } => {
                // we would need to add logical plan operators to datafusion.proto to support this
                // see discussion in https://github.com/apache/arrow-datafusion/issues/2565
                return Err(Error::General("Proto serialization error: Expr::ScalarSubquery(_) | Expr::InSubquery(_) | Expr::Exists { .. } | Exp:OuterReferenceColumn not supported".to_string()));
            }
            Expr::GetIndexedField(GetIndexedField { expr, field }) => {
                let field = match field {
                    GetFieldAccess::NamedStructField { name } => {
                        protobuf::get_indexed_field::Field::NamedStructField(
                            protobuf::NamedStructField {
                                name: Some(name.try_into()?),
                            },
                        )
                    }
                    GetFieldAccess::ListIndex { key } => {
                        protobuf::get_indexed_field::Field::ListIndex(Box::new(
                            protobuf::ListIndex {
                                key: Some(Box::new(key.as_ref().try_into()?)),
                            },
                        ))
                    }
                    GetFieldAccess::ListRange { start, stop } => {
                        protobuf::get_indexed_field::Field::ListRange(Box::new(
                            protobuf::ListRange {
                                start: Some(Box::new(start.as_ref().try_into()?)),
                                stop: Some(Box::new(stop.as_ref().try_into()?)),
                            },
                        ))
                    }
                };

                Self {
                    expr_type: Some(ExprType::GetIndexedField(Box::new(
                        protobuf::GetIndexedField {
                            expr: Some(Box::new(expr.as_ref().try_into()?)),
                            field: Some(field),
                        },
                    ))),
                }
            }

            Expr::GroupingSet(GroupingSet::Cube(exprs)) => Self {
                expr_type: Some(ExprType::Cube(CubeNode {
                    expr: exprs.iter().map(|expr| expr.try_into()).collect::<Result<
                        Vec<_>,
                        Self::Error,
                    >>(
                    )?,
                })),
            },
            Expr::GroupingSet(GroupingSet::Rollup(exprs)) => Self {
                expr_type: Some(ExprType::Rollup(RollupNode {
                    expr: exprs.iter().map(|expr| expr.try_into()).collect::<Result<
                        Vec<_>,
                        Self::Error,
                    >>(
                    )?,
                })),
            },
            Expr::GroupingSet(GroupingSet::GroupingSets(exprs)) => Self {
                expr_type: Some(ExprType::GroupingSet(GroupingSetNode {
                    expr: exprs
                        .iter()
                        .map(|expr_list| {
                            Ok(LogicalExprList {
                                expr: expr_list
                                    .iter()
                                    .map(|expr| expr.try_into())
                                    .collect::<Result<Vec<_>, Self::Error>>()?,
                            })
                        })
                        .collect::<Result<Vec<_>, Self::Error>>()?,
                })),
            },
            Expr::Placeholder(Placeholder { id, data_type }) => {
                let data_type = match data_type {
                    Some(data_type) => Some(data_type.try_into()?),
                    None => None,
                };
                Self {
                    expr_type: Some(ExprType::Placeholder(PlaceholderNode {
                        id: id.clone(),
                        data_type,
                    })),
                }
            }

            Expr::QualifiedWildcard { .. } => return Err(Error::General(
                "Proto serialization error: Expr::QualifiedWildcard { .. } not supported"
                    .to_string(),
            )),
        };

        Ok(expr_node)
    }