fn try_from_physical_plan()

in datafusion/proto/src/physical_plan/mod.rs [330:541]


    fn try_from_physical_plan(
        plan: Arc<dyn ExecutionPlan>,
        extension_codec: &dyn PhysicalExtensionCodec,
    ) -> Result<Self>
    where
        Self: Sized,
    {
        let plan_clone = Arc::clone(&plan);
        let plan = plan.as_any();

        if let Some(exec) = plan.downcast_ref::<ExplainExec>() {
            return protobuf::PhysicalPlanNode::try_from_explain_exec(
                exec,
                extension_codec,
            );
        }

        if let Some(exec) = plan.downcast_ref::<ProjectionExec>() {
            return protobuf::PhysicalPlanNode::try_from_projection_exec(
                exec,
                extension_codec,
            );
        }

        if let Some(exec) = plan.downcast_ref::<AnalyzeExec>() {
            return protobuf::PhysicalPlanNode::try_from_analyze_exec(
                exec,
                extension_codec,
            );
        }

        if let Some(exec) = plan.downcast_ref::<FilterExec>() {
            return protobuf::PhysicalPlanNode::try_from_filter_exec(
                exec,
                extension_codec,
            );
        }

        if let Some(limit) = plan.downcast_ref::<GlobalLimitExec>() {
            return protobuf::PhysicalPlanNode::try_from_global_limit_exec(
                limit,
                extension_codec,
            );
        }

        if let Some(limit) = plan.downcast_ref::<LocalLimitExec>() {
            return protobuf::PhysicalPlanNode::try_from_local_limit_exec(
                limit,
                extension_codec,
            );
        }

        if let Some(exec) = plan.downcast_ref::<HashJoinExec>() {
            return protobuf::PhysicalPlanNode::try_from_hash_join_exec(
                exec,
                extension_codec,
            );
        }

        if let Some(exec) = plan.downcast_ref::<SymmetricHashJoinExec>() {
            return protobuf::PhysicalPlanNode::try_from_symmetric_hash_join_exec(
                exec,
                extension_codec,
            );
        }

        if let Some(exec) = plan.downcast_ref::<CrossJoinExec>() {
            return protobuf::PhysicalPlanNode::try_from_cross_join_exec(
                exec,
                extension_codec,
            );
        }

        if let Some(exec) = plan.downcast_ref::<AggregateExec>() {
            return protobuf::PhysicalPlanNode::try_from_aggregate_exec(
                exec,
                extension_codec,
            );
        }

        if let Some(empty) = plan.downcast_ref::<EmptyExec>() {
            return protobuf::PhysicalPlanNode::try_from_empty_exec(
                empty,
                extension_codec,
            );
        }

        if let Some(empty) = plan.downcast_ref::<PlaceholderRowExec>() {
            return protobuf::PhysicalPlanNode::try_from_placeholder_row_exec(
                empty,
                extension_codec,
            );
        }

        if let Some(coalesce_batches) = plan.downcast_ref::<CoalesceBatchesExec>() {
            return protobuf::PhysicalPlanNode::try_from_coalesce_batches_exec(
                coalesce_batches,
                extension_codec,
            );
        }

        if let Some(data_source_exec) = plan.downcast_ref::<DataSourceExec>() {
            if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_source_exec(
                data_source_exec,
                extension_codec,
            )? {
                return Ok(node);
            }
        }

        if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
            return protobuf::PhysicalPlanNode::try_from_coalesce_partitions_exec(
                exec,
                extension_codec,
            );
        }

        if let Some(exec) = plan.downcast_ref::<RepartitionExec>() {
            return protobuf::PhysicalPlanNode::try_from_repartition_exec(
                exec,
                extension_codec,
            );
        }

        if let Some(exec) = plan.downcast_ref::<SortExec>() {
            return protobuf::PhysicalPlanNode::try_from_sort_exec(exec, extension_codec);
        }

        if let Some(union) = plan.downcast_ref::<UnionExec>() {
            return protobuf::PhysicalPlanNode::try_from_union_exec(
                union,
                extension_codec,
            );
        }

        if let Some(interleave) = plan.downcast_ref::<InterleaveExec>() {
            return protobuf::PhysicalPlanNode::try_from_interleave_exec(
                interleave,
                extension_codec,
            );
        }

        if let Some(exec) = plan.downcast_ref::<SortPreservingMergeExec>() {
            return protobuf::PhysicalPlanNode::try_from_sort_preserving_merge_exec(
                exec,
                extension_codec,
            );
        }

        if let Some(exec) = plan.downcast_ref::<NestedLoopJoinExec>() {
            return protobuf::PhysicalPlanNode::try_from_nested_loop_join_exec(
                exec,
                extension_codec,
            );
        }

        if let Some(exec) = plan.downcast_ref::<WindowAggExec>() {
            return protobuf::PhysicalPlanNode::try_from_window_agg_exec(
                exec,
                extension_codec,
            );
        }

        if let Some(exec) = plan.downcast_ref::<BoundedWindowAggExec>() {
            return protobuf::PhysicalPlanNode::try_from_bounded_window_agg_exec(
                exec,
                extension_codec,
            );
        }

        if let Some(exec) = plan.downcast_ref::<DataSinkExec>() {
            if let Some(node) = protobuf::PhysicalPlanNode::try_from_data_sink_exec(
                exec,
                extension_codec,
            )? {
                return Ok(node);
            }
        }

        if let Some(exec) = plan.downcast_ref::<UnnestExec>() {
            return protobuf::PhysicalPlanNode::try_from_unnest_exec(
                exec,
                extension_codec,
            );
        }

        let mut buf: Vec<u8> = vec![];
        match extension_codec.try_encode(Arc::clone(&plan_clone), &mut buf) {
            Ok(_) => {
                let inputs: Vec<protobuf::PhysicalPlanNode> = plan_clone
                    .children()
                    .into_iter()
                    .cloned()
                    .map(|i| {
                        protobuf::PhysicalPlanNode::try_from_physical_plan(
                            i,
                            extension_codec,
                        )
                    })
                    .collect::<Result<_>>()?;

                Ok(protobuf::PhysicalPlanNode {
                    physical_plan_type: Some(PhysicalPlanType::Extension(
                        protobuf::PhysicalExtensionNode { node: buf, inputs },
                    )),
                })
            }
            Err(e) => internal_err!(
                "Unsupported plan and extension codec failed with [{e}]. Plan: {plan_clone:?}"
            ),
        }
    }