in datafusion/proto/src/logical_plan/to_proto.rs [476:1067]
fn try_from(expr: &Expr) -> Result<Self, Self::Error> {
use protobuf::logical_expr_node::ExprType;
let expr_node = match expr {
Expr::Column(c) => Self {
expr_type: Some(ExprType::Column(c.into())),
},
Expr::Alias(Alias { expr, name, .. }) => {
let alias = Box::new(protobuf::AliasNode {
expr: Some(Box::new(expr.as_ref().try_into()?)),
alias: name.to_owned(),
});
Self {
expr_type: Some(ExprType::Alias(alias)),
}
}
Expr::Literal(value) => {
let pb_value: protobuf::ScalarValue = value.try_into()?;
Self {
expr_type: Some(ExprType::Literal(pb_value)),
}
}
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
// Try to linerize a nested binary expression tree of the same operator
// into a flat vector of expressions.
let mut exprs = vec![right.as_ref()];
let mut current_expr = left.as_ref();
while let Expr::BinaryExpr(BinaryExpr {
left,
op: current_op,
right,
}) = current_expr
{
if current_op == op {
exprs.push(right.as_ref());
current_expr = left.as_ref();
} else {
break;
}
}
exprs.push(current_expr);
let binary_expr = protobuf::BinaryExprNode {
// We need to reverse exprs since operands are expected to be
// linearized from left innermost to right outermost (but while
// traversing the chain we do the exact opposite).
operands: exprs
.into_iter()
.rev()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, Error>>()?,
op: format!("{op:?}"),
};
Self {
expr_type: Some(ExprType::BinaryExpr(binary_expr)),
}
}
Expr::Like(Like {
negated,
expr,
pattern,
escape_char,
case_insensitive,
}) => {
if *case_insensitive {
let pb = Box::new(protobuf::ILikeNode {
negated: *negated,
expr: Some(Box::new(expr.as_ref().try_into()?)),
pattern: Some(Box::new(pattern.as_ref().try_into()?)),
escape_char: escape_char
.map(|ch| ch.to_string())
.unwrap_or_default(),
});
Self {
expr_type: Some(ExprType::Ilike(pb)),
}
} else {
let pb = Box::new(protobuf::LikeNode {
negated: *negated,
expr: Some(Box::new(expr.as_ref().try_into()?)),
pattern: Some(Box::new(pattern.as_ref().try_into()?)),
escape_char: escape_char
.map(|ch| ch.to_string())
.unwrap_or_default(),
});
Self {
expr_type: Some(ExprType::Like(pb)),
}
}
}
Expr::SimilarTo(Like {
negated,
expr,
pattern,
escape_char,
case_insensitive: _,
}) => {
let pb = Box::new(protobuf::SimilarToNode {
negated: *negated,
expr: Some(Box::new(expr.as_ref().try_into()?)),
pattern: Some(Box::new(pattern.as_ref().try_into()?)),
escape_char: escape_char.map(|ch| ch.to_string()).unwrap_or_default(),
});
Self {
expr_type: Some(ExprType::SimilarTo(pb)),
}
}
Expr::WindowFunction(expr::WindowFunction {
ref fun,
ref args,
ref partition_by,
ref order_by,
ref window_frame,
}) => {
let window_function = match fun {
WindowFunction::AggregateFunction(fun) => {
protobuf::window_expr_node::WindowFunction::AggrFunction(
protobuf::AggregateFunction::from(fun).into(),
)
}
WindowFunction::BuiltInWindowFunction(fun) => {
protobuf::window_expr_node::WindowFunction::BuiltInFunction(
protobuf::BuiltInWindowFunction::from(fun).into(),
)
}
WindowFunction::AggregateUDF(aggr_udf) => {
protobuf::window_expr_node::WindowFunction::Udaf(
aggr_udf.name.clone(),
)
}
WindowFunction::WindowUDF(window_udf) => {
protobuf::window_expr_node::WindowFunction::Udwf(
window_udf.name.clone(),
)
}
};
let arg_expr: Option<Box<Self>> = if !args.is_empty() {
let arg = &args[0];
Some(Box::new(arg.try_into()?))
} else {
None
};
let partition_by = partition_by
.iter()
.map(|e| e.try_into())
.collect::<Result<Vec<_>, _>>()?;
let order_by = order_by
.iter()
.map(|e| e.try_into())
.collect::<Result<Vec<_>, _>>()?;
let window_frame: Option<protobuf::WindowFrame> =
Some(window_frame.try_into()?);
let window_expr = Box::new(protobuf::WindowExprNode {
expr: arg_expr,
window_function: Some(window_function),
partition_by,
order_by,
window_frame,
});
Self {
expr_type: Some(ExprType::WindowExpr(window_expr)),
}
}
Expr::AggregateFunction(expr::AggregateFunction {
ref fun,
ref args,
ref distinct,
ref filter,
ref order_by,
}) => {
let aggr_function = match fun {
AggregateFunction::ApproxDistinct => {
protobuf::AggregateFunction::ApproxDistinct
}
AggregateFunction::ApproxPercentileCont => {
protobuf::AggregateFunction::ApproxPercentileCont
}
AggregateFunction::ApproxPercentileContWithWeight => {
protobuf::AggregateFunction::ApproxPercentileContWithWeight
}
AggregateFunction::ArrayAgg => protobuf::AggregateFunction::ArrayAgg,
AggregateFunction::Min => protobuf::AggregateFunction::Min,
AggregateFunction::Max => protobuf::AggregateFunction::Max,
AggregateFunction::Sum => protobuf::AggregateFunction::Sum,
AggregateFunction::BitAnd => protobuf::AggregateFunction::BitAnd,
AggregateFunction::BitOr => protobuf::AggregateFunction::BitOr,
AggregateFunction::BitXor => protobuf::AggregateFunction::BitXor,
AggregateFunction::BoolAnd => protobuf::AggregateFunction::BoolAnd,
AggregateFunction::BoolOr => protobuf::AggregateFunction::BoolOr,
AggregateFunction::Avg => protobuf::AggregateFunction::Avg,
AggregateFunction::Count => protobuf::AggregateFunction::Count,
AggregateFunction::Variance => protobuf::AggregateFunction::Variance,
AggregateFunction::VariancePop => {
protobuf::AggregateFunction::VariancePop
}
AggregateFunction::Covariance => {
protobuf::AggregateFunction::Covariance
}
AggregateFunction::CovariancePop => {
protobuf::AggregateFunction::CovariancePop
}
AggregateFunction::Stddev => protobuf::AggregateFunction::Stddev,
AggregateFunction::StddevPop => {
protobuf::AggregateFunction::StddevPop
}
AggregateFunction::Correlation => {
protobuf::AggregateFunction::Correlation
}
AggregateFunction::RegrSlope => {
protobuf::AggregateFunction::RegrSlope
}
AggregateFunction::RegrIntercept => {
protobuf::AggregateFunction::RegrIntercept
}
AggregateFunction::RegrR2 => protobuf::AggregateFunction::RegrR2,
AggregateFunction::RegrAvgx => protobuf::AggregateFunction::RegrAvgx,
AggregateFunction::RegrAvgy => protobuf::AggregateFunction::RegrAvgy,
AggregateFunction::RegrCount => {
protobuf::AggregateFunction::RegrCount
}
AggregateFunction::RegrSXX => protobuf::AggregateFunction::RegrSxx,
AggregateFunction::RegrSYY => protobuf::AggregateFunction::RegrSyy,
AggregateFunction::RegrSXY => protobuf::AggregateFunction::RegrSxy,
AggregateFunction::ApproxMedian => {
protobuf::AggregateFunction::ApproxMedian
}
AggregateFunction::Grouping => protobuf::AggregateFunction::Grouping,
AggregateFunction::Median => protobuf::AggregateFunction::Median,
AggregateFunction::FirstValue => {
protobuf::AggregateFunction::FirstValueAgg
}
AggregateFunction::LastValue => {
protobuf::AggregateFunction::LastValueAgg
}
};
let aggregate_expr = protobuf::AggregateExprNode {
aggr_function: aggr_function.into(),
expr: args
.iter()
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?,
distinct: *distinct,
filter: match filter {
Some(e) => Some(Box::new(e.as_ref().try_into()?)),
None => None,
},
order_by: match order_by {
Some(e) => e
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, _>>()?,
None => vec![],
},
};
Self {
expr_type: Some(ExprType::AggregateExpr(Box::new(aggregate_expr))),
}
}
Expr::ScalarVariable(_, _) => {
return Err(Error::General(
"Proto serialization error: Scalar Variable not supported"
.to_string(),
))
}
Expr::ScalarFunction(ScalarFunction { fun, args }) => {
let fun: protobuf::ScalarFunction = fun.try_into()?;
let args: Vec<Self> = args
.iter()
.map(|e| e.try_into())
.collect::<Result<Vec<Self>, Error>>()?;
Self {
expr_type: Some(ExprType::ScalarFunction(
protobuf::ScalarFunctionNode {
fun: fun.into(),
args,
},
)),
}
}
Expr::ScalarUDF(ScalarUDF { fun, args }) => Self {
expr_type: Some(ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
fun_name: fun.name.clone(),
args: args
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, Error>>()?,
})),
},
Expr::AggregateUDF(expr::AggregateUDF {
fun,
args,
filter,
order_by,
}) => Self {
expr_type: Some(ExprType::AggregateUdfExpr(Box::new(
protobuf::AggregateUdfExprNode {
fun_name: fun.name.clone(),
args: args.iter().map(|expr| expr.try_into()).collect::<Result<
Vec<_>,
Error,
>>(
)?,
filter: match filter {
Some(e) => Some(Box::new(e.as_ref().try_into()?)),
None => None,
},
order_by: match order_by {
Some(e) => e
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, _>>()?,
None => vec![],
},
},
))),
},
Expr::Not(expr) => {
let expr = Box::new(protobuf::Not {
expr: Some(Box::new(expr.as_ref().try_into()?)),
});
Self {
expr_type: Some(ExprType::NotExpr(expr)),
}
}
Expr::IsNull(expr) => {
let expr = Box::new(protobuf::IsNull {
expr: Some(Box::new(expr.as_ref().try_into()?)),
});
Self {
expr_type: Some(ExprType::IsNullExpr(expr)),
}
}
Expr::IsNotNull(expr) => {
let expr = Box::new(protobuf::IsNotNull {
expr: Some(Box::new(expr.as_ref().try_into()?)),
});
Self {
expr_type: Some(ExprType::IsNotNullExpr(expr)),
}
}
Expr::IsTrue(expr) => {
let expr = Box::new(protobuf::IsTrue {
expr: Some(Box::new(expr.as_ref().try_into()?)),
});
Self {
expr_type: Some(ExprType::IsTrue(expr)),
}
}
Expr::IsFalse(expr) => {
let expr = Box::new(protobuf::IsFalse {
expr: Some(Box::new(expr.as_ref().try_into()?)),
});
Self {
expr_type: Some(ExprType::IsFalse(expr)),
}
}
Expr::IsUnknown(expr) => {
let expr = Box::new(protobuf::IsUnknown {
expr: Some(Box::new(expr.as_ref().try_into()?)),
});
Self {
expr_type: Some(ExprType::IsUnknown(expr)),
}
}
Expr::IsNotTrue(expr) => {
let expr = Box::new(protobuf::IsNotTrue {
expr: Some(Box::new(expr.as_ref().try_into()?)),
});
Self {
expr_type: Some(ExprType::IsNotTrue(expr)),
}
}
Expr::IsNotFalse(expr) => {
let expr = Box::new(protobuf::IsNotFalse {
expr: Some(Box::new(expr.as_ref().try_into()?)),
});
Self {
expr_type: Some(ExprType::IsNotFalse(expr)),
}
}
Expr::IsNotUnknown(expr) => {
let expr = Box::new(protobuf::IsNotUnknown {
expr: Some(Box::new(expr.as_ref().try_into()?)),
});
Self {
expr_type: Some(ExprType::IsNotUnknown(expr)),
}
}
Expr::Between(Between {
expr,
negated,
low,
high,
}) => {
let expr = Box::new(protobuf::BetweenNode {
expr: Some(Box::new(expr.as_ref().try_into()?)),
negated: *negated,
low: Some(Box::new(low.as_ref().try_into()?)),
high: Some(Box::new(high.as_ref().try_into()?)),
});
Self {
expr_type: Some(ExprType::Between(expr)),
}
}
Expr::Case(case) => {
let when_then_expr = case
.when_then_expr
.iter()
.map(|(w, t)| {
Ok(protobuf::WhenThen {
when_expr: Some(w.as_ref().try_into()?),
then_expr: Some(t.as_ref().try_into()?),
})
})
.collect::<Result<Vec<protobuf::WhenThen>, Error>>()?;
let expr = Box::new(protobuf::CaseNode {
expr: match &case.expr {
Some(e) => Some(Box::new(e.as_ref().try_into()?)),
None => None,
},
when_then_expr,
else_expr: match &case.else_expr {
Some(e) => Some(Box::new(e.as_ref().try_into()?)),
None => None,
},
});
Self {
expr_type: Some(ExprType::Case(expr)),
}
}
Expr::Cast(Cast { expr, data_type }) => {
let expr = Box::new(protobuf::CastNode {
expr: Some(Box::new(expr.as_ref().try_into()?)),
arrow_type: Some(data_type.try_into()?),
});
Self {
expr_type: Some(ExprType::Cast(expr)),
}
}
Expr::TryCast(TryCast { expr, data_type }) => {
let expr = Box::new(protobuf::TryCastNode {
expr: Some(Box::new(expr.as_ref().try_into()?)),
arrow_type: Some(data_type.try_into()?),
});
Self {
expr_type: Some(ExprType::TryCast(expr)),
}
}
Expr::Sort(Sort {
expr,
asc,
nulls_first,
}) => {
let expr = Box::new(protobuf::SortExprNode {
expr: Some(Box::new(expr.as_ref().try_into()?)),
asc: *asc,
nulls_first: *nulls_first,
});
Self {
expr_type: Some(ExprType::Sort(expr)),
}
}
Expr::Negative(expr) => {
let expr = Box::new(protobuf::NegativeNode {
expr: Some(Box::new(expr.as_ref().try_into()?)),
});
Self {
expr_type: Some(ExprType::Negative(expr)),
}
}
Expr::InList(InList {
expr,
list,
negated,
}) => {
let expr = Box::new(protobuf::InListNode {
expr: Some(Box::new(expr.as_ref().try_into()?)),
list: list
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, Error>>()?,
negated: *negated,
});
Self {
expr_type: Some(ExprType::InList(expr)),
}
}
Expr::Wildcard => Self {
expr_type: Some(ExprType::Wildcard(true)),
},
Expr::ScalarSubquery(_)
| Expr::InSubquery(_)
| Expr::Exists { .. }
| Expr::OuterReferenceColumn { .. } => {
// we would need to add logical plan operators to datafusion.proto to support this
// see discussion in https://github.com/apache/arrow-datafusion/issues/2565
return Err(Error::General("Proto serialization error: Expr::ScalarSubquery(_) | Expr::InSubquery(_) | Expr::Exists { .. } | Exp:OuterReferenceColumn not supported".to_string()));
}
Expr::GetIndexedField(GetIndexedField { expr, field }) => {
let field = match field {
GetFieldAccess::NamedStructField { name } => {
protobuf::get_indexed_field::Field::NamedStructField(
protobuf::NamedStructField {
name: Some(name.try_into()?),
},
)
}
GetFieldAccess::ListIndex { key } => {
protobuf::get_indexed_field::Field::ListIndex(Box::new(
protobuf::ListIndex {
key: Some(Box::new(key.as_ref().try_into()?)),
},
))
}
GetFieldAccess::ListRange { start, stop } => {
protobuf::get_indexed_field::Field::ListRange(Box::new(
protobuf::ListRange {
start: Some(Box::new(start.as_ref().try_into()?)),
stop: Some(Box::new(stop.as_ref().try_into()?)),
},
))
}
};
Self {
expr_type: Some(ExprType::GetIndexedField(Box::new(
protobuf::GetIndexedField {
expr: Some(Box::new(expr.as_ref().try_into()?)),
field: Some(field),
},
))),
}
}
Expr::GroupingSet(GroupingSet::Cube(exprs)) => Self {
expr_type: Some(ExprType::Cube(CubeNode {
expr: exprs.iter().map(|expr| expr.try_into()).collect::<Result<
Vec<_>,
Self::Error,
>>(
)?,
})),
},
Expr::GroupingSet(GroupingSet::Rollup(exprs)) => Self {
expr_type: Some(ExprType::Rollup(RollupNode {
expr: exprs.iter().map(|expr| expr.try_into()).collect::<Result<
Vec<_>,
Self::Error,
>>(
)?,
})),
},
Expr::GroupingSet(GroupingSet::GroupingSets(exprs)) => Self {
expr_type: Some(ExprType::GroupingSet(GroupingSetNode {
expr: exprs
.iter()
.map(|expr_list| {
Ok(LogicalExprList {
expr: expr_list
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, Self::Error>>()?,
})
})
.collect::<Result<Vec<_>, Self::Error>>()?,
})),
},
Expr::Placeholder(Placeholder { id, data_type }) => {
let data_type = match data_type {
Some(data_type) => Some(data_type.try_into()?),
None => None,
};
Self {
expr_type: Some(ExprType::Placeholder(PlaceholderNode {
id: id.clone(),
data_type,
})),
}
}
Expr::QualifiedWildcard { .. } => return Err(Error::General(
"Proto serialization error: Expr::QualifiedWildcard { .. } not supported"
.to_string(),
)),
};
Ok(expr_node)
}