in datafusion/proto/src/physical_plan/mod.rs [330:541]
fn try_from_physical_plan(
plan: Arc<dyn ExecutionPlan>,
extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<Self>
where
Self: Sized,
{
let plan_clone = Arc::clone(&plan);
let plan = plan.as_any();
if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
return protobuf::PhysicalPlanNode::try_from_explain_exec(
exec,
extension_codec,
);
}
if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
return protobuf::PhysicalPlanNode::try_from_projection_exec(
exec,
extension_codec,
);
}
if let Some(exec) = plan.downcast_ref::<AnalyzeExec>() {
return protobuf::PhysicalPlanNode::try_from_analyze_exec(
exec,
extension_codec,
);
}
if let Some(exec) = plan.downcast_ref::<FilterExec>() {
return protobuf::PhysicalPlanNode::try_from_filter_exec(
exec,
extension_codec,
);
}
if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
return protobuf::PhysicalPlanNode::try_from_global_limit_exec(
limit,
extension_codec,
);
}
if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
return protobuf::PhysicalPlanNode::try_from_local_limit_exec(
limit,
extension_codec,
);
}
if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
return protobuf::PhysicalPlanNode::try_from_hash_join_exec(
exec,
extension_codec,
);
}
if let Some(exec) = plan.downcast_ref::<SymmetricHashJoinExec>() {
return protobuf::PhysicalPlanNode::try_from_symmetric_hash_join_exec(
exec,
extension_codec,
);
}
if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
return protobuf::PhysicalPlanNode::try_from_cross_join_exec(
exec,
extension_codec,
);
}
if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
return protobuf::PhysicalPlanNode::try_from_aggregate_exec(
exec,
extension_codec,
);
}
if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
return protobuf::PhysicalPlanNode::try_from_empty_exec(
empty,
extension_codec,
);
}
if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
return protobuf::PhysicalPlanNode::try_from_placeholder_row_exec(
empty,
extension_codec,
);
}
if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() {
return protobuf::PhysicalPlanNode::try_from_coalesce_batches_exec(
coalesce_batches,
extension_codec,
);
}
if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>() {
if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_source_exec(
data_source_exec,
extension_codec,
)? {
return Ok(node);
}
}
if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
return protobuf::PhysicalPlanNode::try_from_coalesce_partitions_exec(
exec,
extension_codec,
);
}
if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
return protobuf::PhysicalPlanNode::try_from_repartition_exec(
exec,
extension_codec,
);
}
if let Some(exec) = plan.downcast_ref::<SortExec>() {
return protobuf::PhysicalPlanNode::try_from_sort_exec(exec, extension_codec);
}
if let Some(union) = plan.downcast_ref::<UnionExec>() {
return protobuf::PhysicalPlanNode::try_from_union_exec(
union,
extension_codec,
);
}
if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
return protobuf::PhysicalPlanNode::try_from_interleave_exec(
interleave,
extension_codec,
);
}
if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
return protobuf::PhysicalPlanNode::try_from_sort_preserving_merge_exec(
exec,
extension_codec,
);
}
if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
return protobuf::PhysicalPlanNode::try_from_nested_loop_join_exec(
exec,
extension_codec,
);
}
if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
return protobuf::PhysicalPlanNode::try_from_window_agg_exec(
exec,
extension_codec,
);
}
if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
return protobuf::PhysicalPlanNode::try_from_bounded_window_agg_exec(
exec,
extension_codec,
);
}
if let Some(exec) = plan.downcast_ref::<DataSinkExec>() {
if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_sink_exec(
exec,
extension_codec,
)? {
return Ok(node);
}
}
if let Some(exec) = plan.downcast_ref::<UnnestExec>() {
return protobuf::PhysicalPlanNode::try_from_unnest_exec(
exec,
extension_codec,
);
}
let mut buf: Vec<u8> = vec![];
match extension_codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
Ok(_) => {
let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
.children()
.into_iter()
.cloned()
.map(|i| {
protobuf::PhysicalPlanNode::try_from_physical_plan(
i,
extension_codec,
)
})
.collect::<Result<_>>()?;
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Extension(
protobuf::PhysicalExtensionNode { node: buf, inputs },
)),
})
}
Err(e) => internal_err!(
"Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
),
}
}