in datafusion/proto/src/physical_plan/mod.rs [89:787]
fn try_into_physical_plan(
&self,
registry: &dyn FunctionRegistry,
runtime: &RuntimeEnv,
extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan = self.physical_plan_type.as_ref().ok_or_else(|| {
proto_error(format!(
"physical_plan::from_proto() Unsupported physical plan '{self:?}'"
))
})?;
match plan {
PhysicalPlanType::Explain(explain) => Ok(Arc::new(ExplainExec::new(
Arc::new(explain.schema.as_ref().unwrap().try_into()?),
explain
.stringified_plans
.iter()
.map(|plan| plan.into())
.collect(),
explain.verbose,
))),
PhysicalPlanType::Projection(projection) => {
let input: Arc<dyn ExecutionPlan> = into_physical_plan(
&projection.input,
registry,
runtime,
extension_codec,
)?;
let exprs = projection
.expr
.iter()
.zip(projection.expr_name.iter())
.map(|(expr, name)| {
Ok((
parse_physical_expr(expr, registry, input.schema().as_ref())?,
name.to_string(),
))
})
.collect::<Result<Vec<(Arc<dyn PhysicalExpr>, String)>>>()?;
Ok(Arc::new(ProjectionExec::try_new(exprs, input)?))
}
PhysicalPlanType::Filter(filter) => {
let input: Arc<dyn ExecutionPlan> = into_physical_plan(
&filter.input,
registry,
runtime,
extension_codec,
)?;
let predicate = filter
.expr
.as_ref()
.map(|expr| {
parse_physical_expr(expr, registry, input.schema().as_ref())
})
.transpose()?
.ok_or_else(|| {
DataFusionError::Internal(
"filter (FilterExecNode) in PhysicalPlanNode is missing."
.to_owned(),
)
})?;
Ok(Arc::new(FilterExec::try_new(predicate, input)?))
}
PhysicalPlanType::CsvScan(scan) => Ok(Arc::new(CsvExec::new(
parse_protobuf_file_scan_config(
scan.base_conf.as_ref().unwrap(),
registry,
)?,
scan.has_header,
str_to_byte(&scan.delimiter, "delimiter")?,
str_to_byte(&scan.quote, "quote")?,
if let Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(
escape,
)) = &scan.optional_escape
{
Some(str_to_byte(escape, "escape")?)
} else {
None
},
FileCompressionType::UNCOMPRESSED,
))),
PhysicalPlanType::ParquetScan(scan) => {
let base_config = parse_protobuf_file_scan_config(
scan.base_conf.as_ref().unwrap(),
registry,
)?;
let predicate = scan
.predicate
.as_ref()
.map(|expr| {
parse_physical_expr(
expr,
registry,
base_config.file_schema.as_ref(),
)
})
.transpose()?;
Ok(Arc::new(ParquetExec::new(base_config, predicate, None)))
}
PhysicalPlanType::AvroScan(scan) => {
Ok(Arc::new(AvroExec::new(parse_protobuf_file_scan_config(
scan.base_conf.as_ref().unwrap(),
registry,
)?)))
}
PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
let input: Arc<dyn ExecutionPlan> = into_physical_plan(
&coalesce_batches.input,
registry,
runtime,
extension_codec,
)?;
Ok(Arc::new(CoalesceBatchesExec::new(
input,
coalesce_batches.target_batch_size as usize,
)))
}
PhysicalPlanType::Merge(merge) => {
let input: Arc<dyn ExecutionPlan> =
into_physical_plan(&merge.input, registry, runtime, extension_codec)?;
Ok(Arc::new(CoalescePartitionsExec::new(input)))
}
PhysicalPlanType::Repartition(repart) => {
let input: Arc<dyn ExecutionPlan> = into_physical_plan(
&repart.input,
registry,
runtime,
extension_codec,
)?;
match repart.partition_method {
Some(PartitionMethod::Hash(ref hash_part)) => {
let expr = hash_part
.hash_expr
.iter()
.map(|e| {
parse_physical_expr(e, registry, input.schema().as_ref())
})
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>, _>>()?;
Ok(Arc::new(RepartitionExec::try_new(
input,
Partitioning::Hash(
expr,
hash_part.partition_count.try_into().unwrap(),
),
)?))
}
Some(PartitionMethod::RoundRobin(partition_count)) => {
Ok(Arc::new(RepartitionExec::try_new(
input,
Partitioning::RoundRobinBatch(
partition_count.try_into().unwrap(),
),
)?))
}
Some(PartitionMethod::Unknown(partition_count)) => {
Ok(Arc::new(RepartitionExec::try_new(
input,
Partitioning::UnknownPartitioning(
partition_count.try_into().unwrap(),
),
)?))
}
_ => Err(DataFusionError::Internal(
"Invalid partitioning scheme".to_owned(),
)),
}
}
PhysicalPlanType::GlobalLimit(limit) => {
let input: Arc<dyn ExecutionPlan> =
into_physical_plan(&limit.input, registry, runtime, extension_codec)?;
let fetch = if limit.fetch >= 0 {
Some(limit.fetch as usize)
} else {
None
};
Ok(Arc::new(GlobalLimitExec::new(
input,
limit.skip as usize,
fetch,
)))
}
PhysicalPlanType::LocalLimit(limit) => {
let input: Arc<dyn ExecutionPlan> =
into_physical_plan(&limit.input, registry, runtime, extension_codec)?;
Ok(Arc::new(LocalLimitExec::new(input, limit.fetch as usize)))
}
PhysicalPlanType::Window(window_agg) => {
let input: Arc<dyn ExecutionPlan> = into_physical_plan(
&window_agg.input,
registry,
runtime,
extension_codec,
)?;
let input_schema = window_agg
.input_schema
.as_ref()
.ok_or_else(|| {
DataFusionError::Internal(
"input_schema in WindowAggrNode is missing.".to_owned(),
)
})?
.clone();
let physical_schema: SchemaRef =
SchemaRef::new((&input_schema).try_into()?);
let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
.window_expr
.iter()
.zip(window_agg.window_expr_name.iter())
.map(|(expr, name)| {
let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
proto_error("Unexpected empty window physical expression")
})?;
match expr_type {
ExprType::WindowExpr(window_node) => {
let window_node_expr = window_node
.expr
.as_ref()
.map(|e| {
parse_physical_expr(
e.as_ref(),
registry,
&physical_schema,
)
})
.transpose()?
.ok_or_else(|| {
proto_error(
"missing window_node expr expression"
.to_string(),
)
})?;
Ok(create_window_expr(
&convert_required!(window_node.window_function)?,
name.to_owned(),
&[window_node_expr],
&[],
&[],
Arc::new(WindowFrame::new(false)),
&physical_schema,
)?)
}
_ => Err(DataFusionError::Internal(
"Invalid expression for WindowAggrExec".to_string(),
)),
}
})
.collect::<Result<Vec<_>, _>>()?;
//todo fill partition keys and sort keys
Ok(Arc::new(WindowAggExec::try_new(
physical_window_expr,
input,
Arc::new((&input_schema).try_into()?),
vec![],
)?))
}
PhysicalPlanType::Aggregate(hash_agg) => {
let input: Arc<dyn ExecutionPlan> = into_physical_plan(
&hash_agg.input,
registry,
runtime,
extension_codec,
)?;
let mode = protobuf::AggregateMode::from_i32(hash_agg.mode).ok_or_else(
|| {
proto_error(format!(
"Received a AggregateNode message with unknown AggregateMode {}",
hash_agg.mode
))
},
)?;
let agg_mode: AggregateMode = match mode {
protobuf::AggregateMode::Partial => AggregateMode::Partial,
protobuf::AggregateMode::Final => AggregateMode::Final,
protobuf::AggregateMode::FinalPartitioned => {
AggregateMode::FinalPartitioned
}
protobuf::AggregateMode::Single => AggregateMode::Single,
protobuf::AggregateMode::SinglePartitioned => {
AggregateMode::SinglePartitioned
}
};
let num_expr = hash_agg.group_expr.len();
let group_expr = hash_agg
.group_expr
.iter()
.zip(hash_agg.group_expr_name.iter())
.map(|(expr, name)| {
parse_physical_expr(expr, registry, input.schema().as_ref())
.map(|expr| (expr, name.to_string()))
})
.collect::<Result<Vec<_>, _>>()?;
let null_expr = hash_agg
.null_expr
.iter()
.zip(hash_agg.group_expr_name.iter())
.map(|(expr, name)| {
parse_physical_expr(expr, registry, input.schema().as_ref())
.map(|expr| (expr, name.to_string()))
})
.collect::<Result<Vec<_>, _>>()?;
let groups: Vec<Vec<bool>> = if !hash_agg.groups.is_empty() {
hash_agg
.groups
.chunks(num_expr)
.map(|g| g.to_vec())
.collect::<Vec<Vec<bool>>>()
} else {
vec![]
};
let input_schema = hash_agg
.input_schema
.as_ref()
.ok_or_else(|| {
DataFusionError::Internal(
"input_schema in AggregateNode is missing.".to_owned(),
)
})?
.clone();
let physical_schema: SchemaRef =
SchemaRef::new((&input_schema).try_into()?);
let physical_filter_expr = hash_agg
.filter_expr
.iter()
.map(|expr| {
expr.expr
.as_ref()
.map(|e| parse_physical_expr(e, registry, &physical_schema))
.transpose()
})
.collect::<Result<Vec<_>, _>>()?;
let physical_order_by_expr = hash_agg
.order_by_expr
.iter()
.map(|expr| {
expr.sort_expr
.iter()
.map(|e| {
parse_physical_sort_expr(e, registry, &physical_schema)
})
.collect::<Result<Vec<_>>>()
.map(|exprs| (!exprs.is_empty()).then_some(exprs))
})
.collect::<Result<Vec<_>>>()?;
let physical_aggr_expr: Vec<Arc<dyn AggregateExpr>> = hash_agg
.aggr_expr
.iter()
.zip(hash_agg.aggr_expr_name.iter())
.map(|(expr, name)| {
let expr_type = expr.expr_type.as_ref().ok_or_else(|| {
proto_error("Unexpected empty aggregate physical expression")
})?;
match expr_type {
ExprType::AggregateExpr(agg_node) => {
let input_phy_expr: Vec<Arc<dyn PhysicalExpr>> = agg_node.expr.iter()
.map(|e| parse_physical_expr(e, registry, &physical_schema).unwrap()).collect();
let ordering_req: Vec<PhysicalSortExpr> = agg_node.ordering_req.iter()
.map(|e| parse_physical_sort_expr(e, registry, &physical_schema).unwrap()).collect();
agg_node.aggregate_function.as_ref().map(|func| {
match func {
AggregateFunction::AggrFunction(i) => {
let aggr_function = protobuf::AggregateFunction::from_i32(*i)
.ok_or_else(
|| {
proto_error(format!(
"Received an unknown aggregate function: {i}"
))
},
)?;
create_aggregate_expr(
&aggr_function.into(),
agg_node.distinct,
input_phy_expr.as_slice(),
&ordering_req,
&physical_schema,
name.to_string(),
)
}
AggregateFunction::UserDefinedAggrFunction(udaf_name) => {
let agg_udf = registry.udaf(udaf_name)?;
udaf::create_aggregate_expr(agg_udf.as_ref(), &input_phy_expr, &physical_schema, name)
}
}
}).transpose()?.ok_or_else(|| {
proto_error("Invalid AggregateExpr, missing aggregate_function")
})
}
_ => Err(DataFusionError::Internal(
"Invalid aggregate expression for AggregateExec"
.to_string(),
)),
}
})
.collect::<Result<Vec<_>, _>>()?;
Ok(Arc::new(AggregateExec::try_new(
agg_mode,
PhysicalGroupBy::new(group_expr, null_expr, groups),
physical_aggr_expr,
physical_filter_expr,
physical_order_by_expr,
input,
Arc::new((&input_schema).try_into()?),
)?))
}
PhysicalPlanType::HashJoin(hashjoin) => {
let left: Arc<dyn ExecutionPlan> = into_physical_plan(
&hashjoin.left,
registry,
runtime,
extension_codec,
)?;
let right: Arc<dyn ExecutionPlan> = into_physical_plan(
&hashjoin.right,
registry,
runtime,
extension_codec,
)?;
let on: Vec<(Column, Column)> = hashjoin
.on
.iter()
.map(|col| {
let left = into_required!(col.left)?;
let right = into_required!(col.right)?;
Ok((left, right))
})
.collect::<Result<_>>()?;
let join_type = protobuf::JoinType::from_i32(hashjoin.join_type)
.ok_or_else(|| {
proto_error(format!(
"Received a HashJoinNode message with unknown JoinType {}",
hashjoin.join_type
))
})?;
let filter = hashjoin
.filter
.as_ref()
.map(|f| {
let schema = f
.schema
.as_ref()
.ok_or_else(|| proto_error("Missing JoinFilter schema"))?
.try_into()?;
let expression = parse_physical_expr(
f.expression.as_ref().ok_or_else(|| {
proto_error("Unexpected empty filter expression")
})?,
registry, &schema
)?;
let column_indices = f.column_indices
.iter()
.map(|i| {
let side = protobuf::JoinSide::from_i32(i.side)
.ok_or_else(|| proto_error(format!(
"Received a HashJoinNode message with JoinSide in Filter {}",
i.side))
)?;
Ok(ColumnIndex{
index: i.index as usize,
side: side.into(),
})
})
.collect::<Result<Vec<_>>>()?;
Ok(JoinFilter::new(expression, column_indices, schema))
})
.map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
let partition_mode =
protobuf::PartitionMode::from_i32(hashjoin.partition_mode)
.ok_or_else(|| {
proto_error(format!(
"Received a HashJoinNode message with unknown PartitionMode {}",
hashjoin.partition_mode
))
})?;
let partition_mode = match partition_mode {
protobuf::PartitionMode::CollectLeft => PartitionMode::CollectLeft,
protobuf::PartitionMode::Partitioned => PartitionMode::Partitioned,
protobuf::PartitionMode::Auto => PartitionMode::Auto,
};
Ok(Arc::new(HashJoinExec::try_new(
left,
right,
on,
filter,
&join_type.into(),
partition_mode,
hashjoin.null_equals_null,
)?))
}
PhysicalPlanType::Union(union) => {
let mut inputs: Vec<Arc<dyn ExecutionPlan>> = vec![];
for input in &union.inputs {
inputs.push(input.try_into_physical_plan(
registry,
runtime,
extension_codec,
)?);
}
Ok(Arc::new(UnionExec::new(inputs)))
}
PhysicalPlanType::CrossJoin(crossjoin) => {
let left: Arc<dyn ExecutionPlan> = into_physical_plan(
&crossjoin.left,
registry,
runtime,
extension_codec,
)?;
let right: Arc<dyn ExecutionPlan> = into_physical_plan(
&crossjoin.right,
registry,
runtime,
extension_codec,
)?;
Ok(Arc::new(CrossJoinExec::new(left, right)))
}
PhysicalPlanType::Empty(empty) => {
let schema = Arc::new(convert_required!(empty.schema)?);
Ok(Arc::new(EmptyExec::new(empty.produce_one_row, schema)))
}
PhysicalPlanType::Sort(sort) => {
let input: Arc<dyn ExecutionPlan> =
into_physical_plan(&sort.input, registry, runtime, extension_codec)?;
let exprs = sort
.expr
.iter()
.map(|expr| {
let expr = expr.expr_type.as_ref().ok_or_else(|| {
proto_error(format!(
"physical_plan::from_proto() Unexpected expr {self:?}"
))
})?;
if let protobuf::physical_expr_node::ExprType::Sort(sort_expr) = expr {
let expr = sort_expr
.expr
.as_ref()
.ok_or_else(|| {
proto_error(format!(
"physical_plan::from_proto() Unexpected sort expr {self:?}"
))
})?
.as_ref();
Ok(PhysicalSortExpr {
expr: parse_physical_expr(expr,registry, input.schema().as_ref())?,
options: SortOptions {
descending: !sort_expr.asc,
nulls_first: sort_expr.nulls_first,
},
})
} else {
Err(DataFusionError::Internal(format!(
"physical_plan::from_proto() {self:?}"
)))
}
})
.collect::<Result<Vec<_>, _>>()?;
let fetch = if sort.fetch < 0 {
None
} else {
Some(sort.fetch as usize)
};
let new_sort = SortExec::new(exprs, input)
.with_fetch(fetch)
.with_preserve_partitioning(sort.preserve_partitioning);
Ok(Arc::new(new_sort))
}
PhysicalPlanType::SortPreservingMerge(sort) => {
let input: Arc<dyn ExecutionPlan> =
into_physical_plan(&sort.input, registry, runtime, extension_codec)?;
let exprs = sort
.expr
.iter()
.map(|expr| {
let expr = expr.expr_type.as_ref().ok_or_else(|| {
proto_error(format!(
"physical_plan::from_proto() Unexpected expr {self:?}"
))
})?;
if let protobuf::physical_expr_node::ExprType::Sort(sort_expr) = expr {
let expr = sort_expr
.expr
.as_ref()
.ok_or_else(|| {
proto_error(format!(
"physical_plan::from_proto() Unexpected sort expr {self:?}"
))
})?
.as_ref();
Ok(PhysicalSortExpr {
expr: parse_physical_expr(expr,registry, input.schema().as_ref())?,
options: SortOptions {
descending: !sort_expr.asc,
nulls_first: sort_expr.nulls_first,
},
})
} else {
Err(DataFusionError::Internal(format!(
"physical_plan::from_proto() {self:?}"
)))
}
})
.collect::<Result<Vec<_>, _>>()?;
let fetch = if sort.fetch < 0 {
None
} else {
Some(sort.fetch as usize)
};
Ok(Arc::new(
SortPreservingMergeExec::new(exprs, input).with_fetch(fetch),
))
}
PhysicalPlanType::Extension(extension) => {
let inputs: Vec<Arc<dyn ExecutionPlan>> = extension
.inputs
.iter()
.map(|i| i.try_into_physical_plan(registry, runtime, extension_codec))
.collect::<Result<_>>()?;
let extension_node = extension_codec.try_decode(
extension.node.as_slice(),
&inputs,
registry,
)?;
Ok(extension_node)
}
PhysicalPlanType::NestedLoopJoin(join) => {
let left: Arc<dyn ExecutionPlan> =
into_physical_plan(&join.left, registry, runtime, extension_codec)?;
let right: Arc<dyn ExecutionPlan> =
into_physical_plan(&join.right, registry, runtime, extension_codec)?;
let join_type =
protobuf::JoinType::from_i32(join.join_type).ok_or_else(|| {
proto_error(format!(
"Received a NestedLoopJoinExecNode message with unknown JoinType {}",
join.join_type
))
})?;
let filter = join
.filter
.as_ref()
.map(|f| {
let schema = f
.schema
.as_ref()
.ok_or_else(|| proto_error("Missing JoinFilter schema"))?
.try_into()?;
let expression = parse_physical_expr(
f.expression.as_ref().ok_or_else(|| {
proto_error("Unexpected empty filter expression")
})?,
registry, &schema
)?;
let column_indices = f.column_indices
.iter()
.map(|i| {
let side = protobuf::JoinSide::from_i32(i.side)
.ok_or_else(|| proto_error(format!(
"Received a NestedLoopJoinExecNode message with JoinSide in Filter {}",
i.side))
)?;
Ok(ColumnIndex{
index: i.index as usize,
side: side.into(),
})
})
.collect::<Result<Vec<_>>>()?;
Ok(JoinFilter::new(expression, column_indices, schema))
})
.map_or(Ok(None), |v: Result<JoinFilter>| v.map(Some))?;
Ok(Arc::new(NestedLoopJoinExec::try_new(
left,
right,
filter,
&join_type.into(),
)?))
}
}
}