datafusion/proto/src/physical_plan/to_proto.rs (542 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.language governing permissions and
// limitations under the License.
use std::{
convert::{TryFrom, TryInto},
str::FromStr,
sync::Arc,
};
use datafusion::physical_plan::expressions::{CastExpr, TryCastExpr};
use datafusion::physical_plan::ColumnStatistics;
use datafusion::physical_plan::{
expressions::{
CaseExpr, InListExpr, IsNotNullExpr, IsNullExpr, NegativeExpr, NotExpr,
},
Statistics,
};
use datafusion::datasource::listing::{FileRange, PartitionedFile};
use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion::physical_plan::expressions::{Count, DistinctCount, Literal};
use datafusion::physical_plan::expressions::{
Avg, BinaryExpr, BitAnd, BitOr, BitXor, BoolAnd, BoolOr, Column, LikeExpr, Max, Min,
Sum,
};
use datafusion::physical_plan::{AggregateExpr, PhysicalExpr};
use crate::protobuf;
use crate::protobuf::{
physical_aggregate_expr_node, PhysicalSortExprNode, PhysicalSortExprNodeCollection,
ScalarValue,
};
use datafusion::logical_expr::BuiltinScalarFunction;
use datafusion::physical_expr::expressions::{GetFieldAccessExpr, GetIndexedFieldExpr};
use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr};
use datafusion::physical_plan::joins::utils::JoinSide;
use datafusion::physical_plan::udaf::AggregateFunctionExpr;
use datafusion_common::{DataFusionError, Result};
impl TryFrom<Arc<dyn AggregateExpr>> for protobuf::PhysicalExprNode {
type Error = DataFusionError;
fn try_from(a: Arc<dyn AggregateExpr>) -> Result<Self, Self::Error> {
use datafusion::physical_plan::expressions;
use protobuf::AggregateFunction;
let expressions: Vec<protobuf::PhysicalExprNode> = a
.expressions()
.iter()
.map(|e| e.clone().try_into())
.collect::<Result<Vec<_>>>()?;
let ordering_req: Vec<protobuf::PhysicalSortExprNode> = a
.order_bys()
.unwrap_or(&[])
.iter()
.map(|e| e.clone().try_into())
.collect::<Result<Vec<_>>>()?;
let mut distinct = false;
let aggr_function = if a.as_any().downcast_ref::<Avg>().is_some() {
Ok(AggregateFunction::Avg.into())
} else if a.as_any().downcast_ref::<Sum>().is_some() {
Ok(AggregateFunction::Sum.into())
} else if a.as_any().downcast_ref::<Count>().is_some() {
Ok(AggregateFunction::Count.into())
} else if a.as_any().downcast_ref::<BitAnd>().is_some() {
Ok(AggregateFunction::BitAnd.into())
} else if a.as_any().downcast_ref::<BitOr>().is_some() {
Ok(AggregateFunction::BitOr.into())
} else if a.as_any().downcast_ref::<BitXor>().is_some() {
Ok(AggregateFunction::BitXor.into())
} else if a.as_any().downcast_ref::<BoolAnd>().is_some() {
Ok(AggregateFunction::BoolAnd.into())
} else if a.as_any().downcast_ref::<BoolOr>().is_some() {
Ok(AggregateFunction::BoolOr.into())
} else if a.as_any().downcast_ref::<DistinctCount>().is_some() {
distinct = true;
Ok(AggregateFunction::Count.into())
} else if a.as_any().downcast_ref::<Min>().is_some() {
Ok(AggregateFunction::Min.into())
} else if a.as_any().downcast_ref::<Max>().is_some() {
Ok(AggregateFunction::Max.into())
} else if a
.as_any()
.downcast_ref::<expressions::ApproxDistinct>()
.is_some()
{
Ok(AggregateFunction::ApproxDistinct.into())
} else if a.as_any().downcast_ref::<expressions::ArrayAgg>().is_some() {
Ok(AggregateFunction::ArrayAgg.into())
} else if a.as_any().downcast_ref::<expressions::Variance>().is_some() {
Ok(AggregateFunction::Variance.into())
} else if a
.as_any()
.downcast_ref::<expressions::VariancePop>()
.is_some()
{
Ok(AggregateFunction::VariancePop.into())
} else if a
.as_any()
.downcast_ref::<expressions::Covariance>()
.is_some()
{
Ok(AggregateFunction::Covariance.into())
} else if a
.as_any()
.downcast_ref::<expressions::CovariancePop>()
.is_some()
{
Ok(AggregateFunction::CovariancePop.into())
} else if a.as_any().downcast_ref::<expressions::Stddev>().is_some() {
Ok(AggregateFunction::Stddev.into())
} else if a
.as_any()
.downcast_ref::<expressions::StddevPop>()
.is_some()
{
Ok(AggregateFunction::StddevPop.into())
} else if a
.as_any()
.downcast_ref::<expressions::Correlation>()
.is_some()
{
Ok(AggregateFunction::Correlation.into())
} else if a
.as_any()
.downcast_ref::<expressions::ApproxPercentileCont>()
.is_some()
{
Ok(AggregateFunction::ApproxPercentileCont.into())
} else if a
.as_any()
.downcast_ref::<expressions::ApproxPercentileContWithWeight>()
.is_some()
{
Ok(AggregateFunction::ApproxPercentileContWithWeight.into())
} else if a
.as_any()
.downcast_ref::<expressions::ApproxMedian>()
.is_some()
{
Ok(AggregateFunction::ApproxMedian.into())
} else if a.as_any().is::<expressions::FirstValue>() {
Ok(AggregateFunction::FirstValueAgg.into())
} else if a.as_any().is::<expressions::LastValue>() {
Ok(AggregateFunction::LastValueAgg.into())
} else {
if let Some(a) = a.as_any().downcast_ref::<AggregateFunctionExpr>() {
return Ok(protobuf::PhysicalExprNode {
expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
protobuf::PhysicalAggregateExprNode {
aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(a.fun().name.clone())),
expr: expressions,
ordering_req,
distinct,
},
)),
});
}
Err(DataFusionError::NotImplemented(format!(
"Aggregate function not supported: {a:?}"
)))
}?;
Ok(protobuf::PhysicalExprNode {
expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
protobuf::PhysicalAggregateExprNode {
aggregate_function: Some(
physical_aggregate_expr_node::AggregateFunction::AggrFunction(
aggr_function,
),
),
expr: expressions,
ordering_req,
distinct,
},
)),
})
}
}
impl TryFrom<Arc<dyn PhysicalExpr>> for protobuf::PhysicalExprNode {
type Error = DataFusionError;
fn try_from(value: Arc<dyn PhysicalExpr>) -> Result<Self, Self::Error> {
let expr = value.as_any();
if let Some(expr) = expr.downcast_ref::<Column>() {
Ok(protobuf::PhysicalExprNode {
expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
protobuf::PhysicalColumn {
name: expr.name().to_string(),
index: expr.index() as u32,
},
)),
})
} else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
l: Some(Box::new(expr.left().to_owned().try_into()?)),
r: Some(Box::new(expr.right().to_owned().try_into()?)),
op: format!("{:?}", expr.op()),
});
Ok(protobuf::PhysicalExprNode {
expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
binary_expr,
)),
})
} else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
Ok(protobuf::PhysicalExprNode {
expr_type: Some(
protobuf::physical_expr_node::ExprType::Case(
Box::new(
protobuf::PhysicalCaseNode {
expr: expr
.expr()
.map(|exp| exp.clone().try_into().map(Box::new))
.transpose()?,
when_then_expr: expr
.when_then_expr()
.iter()
.map(|(when_expr, then_expr)| {
try_parse_when_then_expr(when_expr, then_expr)
})
.collect::<Result<
Vec<protobuf::PhysicalWhenThen>,
Self::Error,
>>()?,
else_expr: expr
.else_expr()
.map(|a| a.clone().try_into().map(Box::new))
.transpose()?,
},
),
),
),
})
} else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
Ok(protobuf::PhysicalExprNode {
expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(
Box::new(protobuf::PhysicalNot {
expr: Some(Box::new(expr.arg().to_owned().try_into()?)),
}),
)),
})
} else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
Ok(protobuf::PhysicalExprNode {
expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
Box::new(protobuf::PhysicalIsNull {
expr: Some(Box::new(expr.arg().to_owned().try_into()?)),
}),
)),
})
} else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
Ok(protobuf::PhysicalExprNode {
expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
Box::new(protobuf::PhysicalIsNotNull {
expr: Some(Box::new(expr.arg().to_owned().try_into()?)),
}),
)),
})
} else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
Ok(protobuf::PhysicalExprNode {
expr_type: Some(
protobuf::physical_expr_node::ExprType::InList(
Box::new(
protobuf::PhysicalInListNode {
expr: Some(Box::new(expr.expr().to_owned().try_into()?)),
list: expr
.list()
.iter()
.map(|a| a.clone().try_into())
.collect::<Result<
Vec<protobuf::PhysicalExprNode>,
Self::Error,
>>()?,
negated: expr.negated(),
},
),
),
),
})
} else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
Ok(protobuf::PhysicalExprNode {
expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(
Box::new(protobuf::PhysicalNegativeNode {
expr: Some(Box::new(expr.arg().to_owned().try_into()?)),
}),
)),
})
} else if let Some(lit) = expr.downcast_ref::<Literal>() {
Ok(protobuf::PhysicalExprNode {
expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(
lit.value().try_into()?,
)),
})
} else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
Ok(protobuf::PhysicalExprNode {
expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
protobuf::PhysicalCastNode {
expr: Some(Box::new(cast.expr().clone().try_into()?)),
arrow_type: Some(cast.cast_type().try_into()?),
},
))),
})
} else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
Ok(protobuf::PhysicalExprNode {
expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(
Box::new(protobuf::PhysicalTryCastNode {
expr: Some(Box::new(cast.expr().clone().try_into()?)),
arrow_type: Some(cast.cast_type().try_into()?),
}),
)),
})
} else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
let args: Vec<protobuf::PhysicalExprNode> = expr
.args()
.iter()
.map(|e| e.to_owned().try_into())
.collect::<Result<Vec<_>, _>>()?;
if let Ok(fun) = BuiltinScalarFunction::from_str(expr.name()) {
let fun: protobuf::ScalarFunction = (&fun).try_into()?;
Ok(protobuf::PhysicalExprNode {
expr_type: Some(
protobuf::physical_expr_node::ExprType::ScalarFunction(
protobuf::PhysicalScalarFunctionNode {
name: expr.name().to_string(),
fun: fun.into(),
args,
return_type: Some(expr.return_type().try_into()?),
},
),
),
})
} else {
Ok(protobuf::PhysicalExprNode {
expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf(
protobuf::PhysicalScalarUdfNode {
name: expr.name().to_string(),
args,
return_type: Some(expr.return_type().try_into()?),
},
)),
})
}
} else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
Ok(protobuf::PhysicalExprNode {
expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(
Box::new(protobuf::PhysicalLikeExprNode {
negated: expr.negated(),
case_insensitive: expr.case_insensitive(),
expr: Some(Box::new(expr.expr().to_owned().try_into()?)),
pattern: Some(Box::new(expr.pattern().to_owned().try_into()?)),
}),
)),
})
} else if let Some(expr) = expr.downcast_ref::<GetIndexedFieldExpr>() {
let field = match expr.field() {
GetFieldAccessExpr::NamedStructField{name} => Some(
protobuf::physical_get_indexed_field_expr_node::Field::NamedStructFieldExpr(protobuf::NamedStructFieldExpr {
name: Some(ScalarValue::try_from(name)?)
})
),
GetFieldAccessExpr::ListIndex{key} => Some(
protobuf::physical_get_indexed_field_expr_node::Field::ListIndexExpr(Box::new(protobuf::ListIndexExpr {
key: Some(Box::new(key.to_owned().try_into()?))
}))
),
GetFieldAccessExpr::ListRange{start, stop} => Some(
protobuf::physical_get_indexed_field_expr_node::Field::ListRangeExpr(Box::new(protobuf::ListRangeExpr {
start: Some(Box::new(start.to_owned().try_into()?)),
stop: Some(Box::new(stop.to_owned().try_into()?)),
}))
),
};
Ok(protobuf::PhysicalExprNode {
expr_type: Some(
protobuf::physical_expr_node::ExprType::GetIndexedFieldExpr(
Box::new(protobuf::PhysicalGetIndexedFieldExprNode {
arg: Some(Box::new(expr.arg().to_owned().try_into()?)),
field,
}),
),
),
})
} else {
Err(DataFusionError::Internal(format!(
"physical_plan::to_proto() unsupported expression {value:?}"
)))
}
}
}
fn try_parse_when_then_expr(
when_expr: &Arc<dyn PhysicalExpr>,
then_expr: &Arc<dyn PhysicalExpr>,
) -> Result<protobuf::PhysicalWhenThen> {
Ok(protobuf::PhysicalWhenThen {
when_expr: Some(when_expr.clone().try_into()?),
then_expr: Some(then_expr.clone().try_into()?),
})
}
impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile {
type Error = DataFusionError;
fn try_from(pf: &PartitionedFile) -> Result<Self, Self::Error> {
Ok(protobuf::PartitionedFile {
path: pf.object_meta.location.as_ref().to_owned(),
size: pf.object_meta.size as u64,
last_modified_ns: pf.object_meta.last_modified.timestamp_nanos() as u64,
partition_values: pf
.partition_values
.iter()
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?,
range: pf.range.as_ref().map(|r| r.try_into()).transpose()?,
})
}
}
impl TryFrom<&FileRange> for protobuf::FileRange {
type Error = DataFusionError;
fn try_from(value: &FileRange) -> Result<Self, Self::Error> {
Ok(protobuf::FileRange {
start: value.start,
end: value.end,
})
}
}
impl TryFrom<&[PartitionedFile]> for protobuf::FileGroup {
type Error = DataFusionError;
fn try_from(gr: &[PartitionedFile]) -> Result<Self, Self::Error> {
Ok(protobuf::FileGroup {
files: gr
.iter()
.map(|f| f.try_into())
.collect::<Result<Vec<_>, _>>()?,
})
}
}
impl From<&ColumnStatistics> for protobuf::ColumnStats {
fn from(cs: &ColumnStatistics) -> protobuf::ColumnStats {
protobuf::ColumnStats {
min_value: cs.min_value.as_ref().map(|m| m.try_into().unwrap()),
max_value: cs.max_value.as_ref().map(|m| m.try_into().unwrap()),
null_count: cs.null_count.map(|n| n as u32).unwrap_or(0),
distinct_count: cs.distinct_count.map(|n| n as u32).unwrap_or(0),
}
}
}
impl From<&Statistics> for protobuf::Statistics {
fn from(s: &Statistics) -> protobuf::Statistics {
let none_value = -1_i64;
let column_stats = match &s.column_statistics {
None => vec![],
Some(column_stats) => column_stats.iter().map(|s| s.into()).collect(),
};
protobuf::Statistics {
num_rows: s.num_rows.map(|n| n as i64).unwrap_or(none_value),
total_byte_size: s.total_byte_size.map(|n| n as i64).unwrap_or(none_value),
column_stats,
is_exact: s.is_exact,
}
}
}
impl TryFrom<&FileScanConfig> for protobuf::FileScanExecConf {
type Error = DataFusionError;
fn try_from(
conf: &FileScanConfig,
) -> Result<protobuf::FileScanExecConf, Self::Error> {
let file_groups = conf
.file_groups
.iter()
.map(|p| p.as_slice().try_into())
.collect::<Result<Vec<_>, _>>()?;
let mut output_orderings = vec![];
for order in &conf.output_ordering {
let expr_node_vec = order
.iter()
.map(|sort_expr| {
let expr = sort_expr.expr.clone().try_into()?;
Ok(PhysicalSortExprNode {
expr: Some(Box::new(expr)),
asc: !sort_expr.options.descending,
nulls_first: sort_expr.options.nulls_first,
})
})
.collect::<Result<Vec<PhysicalSortExprNode>>>()?;
output_orderings.push(expr_node_vec)
}
Ok(protobuf::FileScanExecConf {
file_groups,
statistics: Some((&conf.statistics).into()),
limit: conf.limit.map(|l| protobuf::ScanLimit { limit: l as u32 }),
projection: conf
.projection
.as_ref()
.unwrap_or(&vec![])
.iter()
.map(|n| *n as u32)
.collect(),
schema: Some(conf.file_schema.as_ref().try_into()?),
table_partition_cols: conf
.table_partition_cols
.iter()
.map(|x| x.0.clone())
.collect::<Vec<_>>(),
object_store_url: conf.object_store_url.to_string(),
output_ordering: output_orderings
.into_iter()
.map(|e| PhysicalSortExprNodeCollection {
physical_sort_expr_nodes: e,
})
.collect::<Vec<_>>(),
})
}
}
impl From<JoinSide> for protobuf::JoinSide {
fn from(t: JoinSide) -> Self {
match t {
JoinSide::Left => protobuf::JoinSide::LeftSide,
JoinSide::Right => protobuf::JoinSide::RightSide,
}
}
}
impl TryFrom<Option<Arc<dyn PhysicalExpr>>> for protobuf::MaybeFilter {
type Error = DataFusionError;
fn try_from(expr: Option<Arc<dyn PhysicalExpr>>) -> Result<Self, Self::Error> {
match expr {
None => Ok(protobuf::MaybeFilter { expr: None }),
Some(expr) => Ok(protobuf::MaybeFilter {
expr: Some(expr.try_into()?),
}),
}
}
}
impl TryFrom<Option<Vec<PhysicalSortExpr>>> for protobuf::MaybePhysicalSortExprs {
type Error = DataFusionError;
fn try_from(sort_exprs: Option<Vec<PhysicalSortExpr>>) -> Result<Self, Self::Error> {
match sort_exprs {
None => Ok(protobuf::MaybePhysicalSortExprs { sort_expr: vec![] }),
Some(sort_exprs) => Ok(protobuf::MaybePhysicalSortExprs {
sort_expr: sort_exprs
.into_iter()
.map(|sort_expr| sort_expr.try_into())
.collect::<Result<Vec<_>>>()?,
}),
}
}
}
impl TryFrom<PhysicalSortExpr> for protobuf::PhysicalSortExprNode {
type Error = DataFusionError;
fn try_from(sort_expr: PhysicalSortExpr) -> std::result::Result<Self, Self::Error> {
Ok(PhysicalSortExprNode {
expr: Some(Box::new(sort_expr.expr.try_into()?)),
asc: !sort_expr.options.descending,
nulls_first: sort_expr.options.nulls_first,
})
}
}