in datafusion/proto/src/physical_plan/mod.rs [789:1308]
fn try_from_physical_plan(
plan: Arc<dyn ExecutionPlan>,
extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<Self>
where
Self: Sized,
{
let plan_clone = plan.clone();
let plan = plan.as_any();
if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Explain(
protobuf::ExplainExecNode {
schema: Some(exec.schema().as_ref().try_into()?),
stringified_plans: exec
.stringified_plans()
.iter()
.map(|plan| plan.into())
.collect(),
verbose: exec.verbose(),
},
)),
})
} else if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
)?;
let expr = exec
.expr()
.iter()
.map(|expr| expr.0.clone().try_into())
.collect::<Result<Vec<_>>>()?;
let expr_name = exec.expr().iter().map(|expr| expr.1.clone()).collect();
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Projection(Box::new(
protobuf::ProjectionExecNode {
input: Some(Box::new(input)),
expr,
expr_name,
},
))),
})
} else if let Some(exec) = plan.downcast_ref::<FilterExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
)?;
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Filter(Box::new(
protobuf::FilterExecNode {
input: Some(Box::new(input)),
expr: Some(exec.predicate().clone().try_into()?),
},
))),
})
} else if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
limit.input().to_owned(),
extension_codec,
)?;
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::GlobalLimit(Box::new(
protobuf::GlobalLimitExecNode {
input: Some(Box::new(input)),
skip: limit.skip() as u32,
fetch: match limit.fetch() {
Some(n) => n as i64,
_ => -1, // no limit
},
},
))),
})
} else if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
limit.input().to_owned(),
extension_codec,
)?;
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::LocalLimit(Box::new(
protobuf::LocalLimitExecNode {
input: Some(Box::new(input)),
fetch: limit.fetch() as u32,
},
))),
})
} else if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.left().to_owned(),
extension_codec,
)?;
let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.right().to_owned(),
extension_codec,
)?;
let on: Vec<protobuf::JoinOn> = exec
.on()
.iter()
.map(|tuple| protobuf::JoinOn {
left: Some(protobuf::PhysicalColumn {
name: tuple.0.name().to_string(),
index: tuple.0.index() as u32,
}),
right: Some(protobuf::PhysicalColumn {
name: tuple.1.name().to_string(),
index: tuple.1.index() as u32,
}),
})
.collect();
let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
let filter = exec
.filter()
.as_ref()
.map(|f| {
let expression = f.expression().to_owned().try_into()?;
let column_indices = f
.column_indices()
.iter()
.map(|i| {
let side: protobuf::JoinSide = i.side.to_owned().into();
protobuf::ColumnIndex {
index: i.index as u32,
side: side.into(),
}
})
.collect();
let schema = f.schema().try_into()?;
Ok(protobuf::JoinFilter {
expression: Some(expression),
column_indices,
schema: Some(schema),
})
})
.map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
let partition_mode = match exec.partition_mode() {
PartitionMode::CollectLeft => protobuf::PartitionMode::CollectLeft,
PartitionMode::Partitioned => protobuf::PartitionMode::Partitioned,
PartitionMode::Auto => protobuf::PartitionMode::Auto,
};
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::HashJoin(Box::new(
protobuf::HashJoinExecNode {
left: Some(Box::new(left)),
right: Some(Box::new(right)),
on,
join_type: join_type.into(),
partition_mode: partition_mode.into(),
null_equals_null: exec.null_equals_null(),
filter,
},
))),
})
} else if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.left().to_owned(),
extension_codec,
)?;
let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.right().to_owned(),
extension_codec,
)?;
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::CrossJoin(Box::new(
protobuf::CrossJoinExecNode {
left: Some(Box::new(left)),
right: Some(Box::new(right)),
},
))),
})
} else if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
let groups: Vec<bool> = exec
.group_expr()
.groups()
.iter()
.flatten()
.copied()
.collect();
let group_names = exec
.group_expr()
.expr()
.iter()
.map(|expr| expr.1.to_owned())
.collect();
let filter = exec
.filter_expr()
.iter()
.map(|expr| expr.to_owned().try_into())
.collect::<Result<Vec<_>>>()?;
let order_by = exec
.order_by_expr()
.iter()
.map(|expr| expr.to_owned().try_into())
.collect::<Result<Vec<_>>>()?;
let agg = exec
.aggr_expr()
.iter()
.map(|expr| expr.to_owned().try_into())
.collect::<Result<Vec<_>>>()?;
let agg_names = exec
.aggr_expr()
.iter()
.map(|expr| match expr.field() {
Ok(field) => Ok(field.name().clone()),
Err(e) => Err(e),
})
.collect::<Result<_>>()?;
let agg_mode = match exec.mode() {
AggregateMode::Partial => protobuf::AggregateMode::Partial,
AggregateMode::Final => protobuf::AggregateMode::Final,
AggregateMode::FinalPartitioned => {
protobuf::AggregateMode::FinalPartitioned
}
AggregateMode::Single => protobuf::AggregateMode::Single,
AggregateMode::SinglePartitioned => {
protobuf::AggregateMode::SinglePartitioned
}
};
let input_schema = exec.input_schema();
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
)?;
let null_expr = exec
.group_expr()
.null_expr()
.iter()
.map(|expr| expr.0.to_owned().try_into())
.collect::<Result<Vec<_>>>()?;
let group_expr = exec
.group_expr()
.expr()
.iter()
.map(|expr| expr.0.to_owned().try_into())
.collect::<Result<Vec<_>>>()?;
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Aggregate(Box::new(
protobuf::AggregateExecNode {
group_expr,
group_expr_name: group_names,
aggr_expr: agg,
filter_expr: filter,
order_by_expr: order_by,
aggr_expr_name: agg_names,
mode: agg_mode as i32,
input: Some(Box::new(input)),
input_schema: Some(input_schema.as_ref().try_into()?),
null_expr,
groups,
},
))),
})
} else if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
let schema = empty.schema().as_ref().try_into()?;
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Empty(
protobuf::EmptyExecNode {
produce_one_row: empty.produce_one_row(),
schema: Some(schema),
},
)),
})
} else if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>()
{
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
coalesce_batches.input().to_owned(),
extension_codec,
)?;
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::CoalesceBatches(Box::new(
protobuf::CoalesceBatchesExecNode {
input: Some(Box::new(input)),
target_batch_size: coalesce_batches.target_batch_size() as u32,
},
))),
})
} else if let Some(exec) = plan.downcast_ref::<CsvExec>() {
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::CsvScan(
protobuf::CsvScanExecNode {
base_conf: Some(exec.base_config().try_into()?),
has_header: exec.has_header(),
delimiter: byte_to_string(exec.delimiter(), "delimiter")?,
quote: byte_to_string(exec.quote(), "quote")?,
optional_escape: if let Some(escape) = exec.escape() {
Some(protobuf::csv_scan_exec_node::OptionalEscape::Escape(
byte_to_string(escape, "escape")?,
))
} else {
None
},
},
)),
})
} else if let Some(exec) = plan.downcast_ref::<ParquetExec>() {
let predicate = exec
.predicate()
.map(|pred| pred.clone().try_into())
.transpose()?;
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::ParquetScan(
protobuf::ParquetScanExecNode {
base_conf: Some(exec.base_config().try_into()?),
predicate,
},
)),
})
} else if let Some(exec) = plan.downcast_ref::<AvroExec>() {
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::AvroScan(
protobuf::AvroScanExecNode {
base_conf: Some(exec.base_config().try_into()?),
},
)),
})
} else if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
)?;
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
protobuf::CoalescePartitionsExecNode {
input: Some(Box::new(input)),
},
))),
})
} else if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
)?;
let pb_partition_method = match exec.partitioning() {
Partitioning::Hash(exprs, partition_count) => {
PartitionMethod::Hash(protobuf::PhysicalHashRepartition {
hash_expr: exprs
.iter()
.map(|expr| expr.clone().try_into())
.collect::<Result<Vec<_>>>()?,
partition_count: *partition_count as u64,
})
}
Partitioning::RoundRobinBatch(partition_count) => {
PartitionMethod::RoundRobin(*partition_count as u64)
}
Partitioning::UnknownPartitioning(partition_count) => {
PartitionMethod::Unknown(*partition_count as u64)
}
};
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Repartition(Box::new(
protobuf::RepartitionExecNode {
input: Some(Box::new(input)),
partition_method: Some(pb_partition_method),
},
))),
})
} else if let Some(exec) = plan.downcast_ref::<SortExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
)?;
let expr = exec
.expr()
.iter()
.map(|expr| {
let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
expr: Some(Box::new(expr.expr.to_owned().try_into()?)),
asc: !expr.options.descending,
nulls_first: expr.options.nulls_first,
});
Ok(protobuf::PhysicalExprNode {
expr_type: Some(protobuf::physical_expr_node::ExprType::Sort(
sort_expr,
)),
})
})
.collect::<Result<Vec<_>>>()?;
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Sort(Box::new(
protobuf::SortExecNode {
input: Some(Box::new(input)),
expr,
fetch: match exec.fetch() {
Some(n) => n as i64,
_ => -1,
},
preserve_partitioning: exec.preserve_partitioning(),
},
))),
})
} else if let Some(union) = plan.downcast_ref::<UnionExec>() {
let mut inputs: Vec<PhysicalPlanNode> = vec![];
for input in union.inputs() {
inputs.push(protobuf::PhysicalPlanNode::try_from_physical_plan(
input.to_owned(),
extension_codec,
)?);
}
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Union(
protobuf::UnionExecNode { inputs },
)),
})
} else if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
let input = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.input().to_owned(),
extension_codec,
)?;
let expr = exec
.expr()
.iter()
.map(|expr| {
let sort_expr = Box::new(protobuf::PhysicalSortExprNode {
expr: Some(Box::new(expr.expr.to_owned().try_into()?)),
asc: !expr.options.descending,
nulls_first: expr.options.nulls_first,
});
Ok(protobuf::PhysicalExprNode {
expr_type: Some(protobuf::physical_expr_node::ExprType::Sort(
sort_expr,
)),
})
})
.collect::<Result<Vec<_>>>()?;
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::SortPreservingMerge(
Box::new(protobuf::SortPreservingMergeExecNode {
input: Some(Box::new(input)),
expr,
fetch: exec.fetch().map(|f| f as i64).unwrap_or(-1),
}),
)),
})
} else if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
let left = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.left().to_owned(),
extension_codec,
)?;
let right = protobuf::PhysicalPlanNode::try_from_physical_plan(
exec.right().to_owned(),
extension_codec,
)?;
let join_type: protobuf::JoinType = exec.join_type().to_owned().into();
let filter = exec
.filter()
.as_ref()
.map(|f| {
let expression = f.expression().to_owned().try_into()?;
let column_indices = f
.column_indices()
.iter()
.map(|i| {
let side: protobuf::JoinSide = i.side.to_owned().into();
protobuf::ColumnIndex {
index: i.index as u32,
side: side.into(),
}
})
.collect();
let schema = f.schema().try_into()?;
Ok(protobuf::JoinFilter {
expression: Some(expression),
column_indices,
schema: Some(schema),
})
})
.map_or(Ok(None), |v: Result<protobuf::JoinFilter>| v.map(Some))?;
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::NestedLoopJoin(Box::new(
protobuf::NestedLoopJoinExecNode {
left: Some(Box::new(left)),
right: Some(Box::new(right)),
join_type: join_type.into(),
filter,
},
))),
})
} else {
let mut buf: Vec<u8> = vec![];
match extension_codec.try_encode(plan_clone.clone(), &mut buf) {
Ok(_) => {
let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
.children()
.into_iter()
.map(|i| {
protobuf::PhysicalPlanNode::try_from_physical_plan(
i,
extension_codec,
)
})
.collect::<Result<_>>()?;
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Extension(
protobuf::PhysicalExtensionNode { node: buf, inputs },
)),
})
}
Err(e) => Err(DataFusionError::Internal(format!(
"Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
))),
}
}
}