fn try_from_logical_plan()

in datafusion/proto/src/logical_plan/mod.rs [1000:1809]


    fn try_from_logical_plan(
        plan: &LogicalPlan,
        extension_codec: &dyn LogicalExtensionCodec,
    ) -> Result<Self>
    where
        Self: Sized,
    {
        match plan {
            LogicalPlan::Values(Values { values, .. }) => {
                let n_cols = if values.is_empty() {
                    0
                } else {
                    values[0].len()
                } as u64;
                let values_list =
                    serialize_exprs(values.iter().flatten(), extension_codec)?;
                Ok(LogicalPlanNode {
                    logical_plan_type: Some(LogicalPlanType::Values(
                        protobuf::ValuesNode {
                            n_cols,
                            values_list,
                        },
                    )),
                })
            }
            LogicalPlan::TableScan(TableScan {
                table_name,
                source,
                filters,
                projection,
                ..
            }) => {
                let provider = source_as_provider(source)?;
                let schema = provider.schema();
                let source = provider.as_any();

                let projection = match projection {
                    None => None,
                    Some(columns) => {
                        let column_names = columns
                            .iter()
                            .map(|i| schema.field(*i).name().to_owned())
                            .collect();
                        Some(protobuf::ProjectionColumns {
                            columns: column_names,
                        })
                    }
                };

                let filters: Vec<protobuf::LogicalExprNode> =
                    serialize_exprs(filters, extension_codec)?;

                if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
                    let any = listing_table.options().format.as_any();
                    let file_format_type = {
                        let mut maybe_some_type = None;

                        #[cfg(feature = "parquet")]
                        if let Some(parquet) = any.downcast_ref::<ParquetFormat>() {
                            let options = parquet.options();
                            maybe_some_type =
                                Some(FileFormatType::Parquet(protobuf::ParquetFormat {
                                    options: Some(options.try_into()?),
                                }));
                        };

                        if let Some(csv) = any.downcast_ref::<CsvFormat>() {
                            let options = csv.options();
                            maybe_some_type =
                                Some(FileFormatType::Csv(protobuf::CsvFormat {
                                    options: Some(options.try_into()?),
                                }));
                        }

                        if let Some(json) = any.downcast_ref::<OtherNdJsonFormat>() {
                            let options = json.options();
                            maybe_some_type =
                                Some(FileFormatType::Json(protobuf::NdJsonFormat {
                                    options: Some(options.try_into()?),
                                }))
                        }

                        #[cfg(feature = "avro")]
                        if any.is::<AvroFormat>() {
                            maybe_some_type =
                                Some(FileFormatType::Avro(protobuf::AvroFormat {}))
                        }

                        if let Some(file_format_type) = maybe_some_type {
                            file_format_type
                        } else {
                            return Err(proto_error(format!(
                            "Error converting file format, {:?} is invalid as a datafusion format.",
                            listing_table.options().format
                        )));
                        }
                    };

                    let options = listing_table.options();

                    let mut builder = SchemaBuilder::from(schema.as_ref());
                    for (idx, field) in schema.fields().iter().enumerate().rev() {
                        if options
                            .table_partition_cols
                            .iter()
                            .any(|(name, _)| name == field.name())
                        {
                            builder.remove(idx);
                        }
                    }

                    let schema = builder.finish();

                    let schema: protobuf::Schema = (&schema).try_into()?;

                    let mut exprs_vec: Vec<SortExprNodeCollection> = vec![];
                    for order in &options.file_sort_order {
                        let expr_vec = SortExprNodeCollection {
                            sort_expr_nodes: serialize_sorts(order, extension_codec)?,
                        };
                        exprs_vec.push(expr_vec);
                    }

                    let partition_columns = options
                        .table_partition_cols
                        .iter()
                        .map(|(name, arrow_type)| {
                            let arrow_type = protobuf::ArrowType::try_from(arrow_type)
                                .map_err(|e| {
                                    proto_error(format!(
                                        "Received an unknown ArrowType: {}",
                                        e
                                    ))
                                })?;
                            Ok(protobuf::PartitionColumn {
                                name: name.clone(),
                                arrow_type: Some(arrow_type),
                            })
                        })
                        .collect::<Result<Vec<_>>>()?;

                    Ok(LogicalPlanNode {
                        logical_plan_type: Some(LogicalPlanType::ListingScan(
                            protobuf::ListingTableScanNode {
                                file_format_type: Some(file_format_type),
                                table_name: Some(table_name.clone().into()),
                                collect_stat: options.collect_stat,
                                file_extension: options.file_extension.clone(),
                                table_partition_cols: partition_columns,
                                paths: listing_table
                                    .table_paths()
                                    .iter()
                                    .map(|x| x.to_string())
                                    .collect(),
                                schema: Some(schema),
                                projection,
                                filters,
                                target_partitions: options.target_partitions as u32,
                                file_sort_order: exprs_vec,
                            },
                        )),
                    })
                } else if let Some(view_table) = source.downcast_ref::<ViewTable>() {
                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
                    Ok(LogicalPlanNode {
                        logical_plan_type: Some(LogicalPlanType::ViewScan(Box::new(
                            protobuf::ViewTableScanNode {
                                table_name: Some(table_name.clone().into()),
                                input: Some(Box::new(
                                    LogicalPlanNode::try_from_logical_plan(
                                        view_table.logical_plan(),
                                        extension_codec,
                                    )?,
                                )),
                                schema: Some(schema),
                                projection,
                                definition: view_table
                                    .definition()
                                    .map(|s| s.to_string())
                                    .unwrap_or_default(),
                            },
                        ))),
                    })
                } else if let Some(cte_work_table) = source.downcast_ref::<CteWorkTable>()
                {
                    let name = cte_work_table.name().to_string();
                    let schema = cte_work_table.schema();
                    let schema: protobuf::Schema = schema.as_ref().try_into()?;

                    Ok(LogicalPlanNode {
                        logical_plan_type: Some(LogicalPlanType::CteWorkTableScan(
                            protobuf::CteWorkTableScanNode {
                                name,
                                schema: Some(schema),
                            },
                        )),
                    })
                } else {
                    let schema: protobuf::Schema = schema.as_ref().try_into()?;
                    let mut bytes = vec![];
                    extension_codec
                        .try_encode_table_provider(table_name, provider, &mut bytes)
                        .map_err(|e| context!("Error serializing custom table", e))?;
                    let scan = CustomScan(CustomTableScanNode {
                        table_name: Some(table_name.clone().into()),
                        projection,
                        schema: Some(schema),
                        filters,
                        custom_table_data: bytes,
                    });
                    let node = LogicalPlanNode {
                        logical_plan_type: Some(scan),
                    };
                    Ok(node)
                }
            }
            LogicalPlan::Projection(Projection { expr, input, .. }) => {
                Ok(LogicalPlanNode {
                    logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
                        protobuf::ProjectionNode {
                            input: Some(Box::new(
                                LogicalPlanNode::try_from_logical_plan(
                                    input.as_ref(),
                                    extension_codec,
                                )?,
                            )),
                            expr: serialize_exprs(expr, extension_codec)?,
                            optional_alias: None,
                        },
                    ))),
                })
            }
            LogicalPlan::Filter(filter) => {
                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
                    filter.input.as_ref(),
                    extension_codec,
                )?;
                Ok(LogicalPlanNode {
                    logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
                        protobuf::SelectionNode {
                            input: Some(Box::new(input)),
                            expr: Some(serialize_expr(
                                &filter.predicate,
                                extension_codec,
                            )?),
                        },
                    ))),
                })
            }
            LogicalPlan::Distinct(Distinct::All(input)) => {
                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
                    input.as_ref(),
                    extension_codec,
                )?;
                Ok(LogicalPlanNode {
                    logical_plan_type: Some(LogicalPlanType::Distinct(Box::new(
                        protobuf::DistinctNode {
                            input: Some(Box::new(input)),
                        },
                    ))),
                })
            }
            LogicalPlan::Distinct(Distinct::On(DistinctOn {
                on_expr,
                select_expr,
                sort_expr,
                input,
                ..
            })) => {
                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
                    input.as_ref(),
                    extension_codec,
                )?;
                let sort_expr = match sort_expr {
                    None => vec![],
                    Some(sort_expr) => serialize_sorts(sort_expr, extension_codec)?,
                };
                Ok(LogicalPlanNode {
                    logical_plan_type: Some(LogicalPlanType::DistinctOn(Box::new(
                        protobuf::DistinctOnNode {
                            on_expr: serialize_exprs(on_expr, extension_codec)?,
                            select_expr: serialize_exprs(select_expr, extension_codec)?,
                            sort_expr,
                            input: Some(Box::new(input)),
                        },
                    ))),
                })
            }
            LogicalPlan::Window(Window {
                input, window_expr, ..
            }) => {
                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
                    input.as_ref(),
                    extension_codec,
                )?;
                Ok(LogicalPlanNode {
                    logical_plan_type: Some(LogicalPlanType::Window(Box::new(
                        protobuf::WindowNode {
                            input: Some(Box::new(input)),
                            window_expr: serialize_exprs(window_expr, extension_codec)?,
                        },
                    ))),
                })
            }
            LogicalPlan::Aggregate(Aggregate {
                group_expr,
                aggr_expr,
                input,
                ..
            }) => {
                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
                    input.as_ref(),
                    extension_codec,
                )?;
                Ok(LogicalPlanNode {
                    logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new(
                        protobuf::AggregateNode {
                            input: Some(Box::new(input)),
                            group_expr: serialize_exprs(group_expr, extension_codec)?,
                            aggr_expr: serialize_exprs(aggr_expr, extension_codec)?,
                        },
                    ))),
                })
            }
            LogicalPlan::Join(Join {
                left,
                right,
                on,
                filter,
                join_type,
                join_constraint,
                null_equals_null,
                ..
            }) => {
                let left: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
                    left.as_ref(),
                    extension_codec,
                )?;
                let right: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
                    right.as_ref(),
                    extension_codec,
                )?;
                let (left_join_key, right_join_key) = on
                    .iter()
                    .map(|(l, r)| {
                        Ok((
                            serialize_expr(l, extension_codec)?,
                            serialize_expr(r, extension_codec)?,
                        ))
                    })
                    .collect::<Result<Vec<_>, ToProtoError>>()?
                    .into_iter()
                    .unzip();
                let join_type: protobuf::JoinType = join_type.to_owned().into();
                let join_constraint: protobuf::JoinConstraint =
                    join_constraint.to_owned().into();
                let filter = filter
                    .as_ref()
                    .map(|e| serialize_expr(e, extension_codec))
                    .map_or(Ok(None), |v| v.map(Some))?;
                Ok(LogicalPlanNode {
                    logical_plan_type: Some(LogicalPlanType::Join(Box::new(
                        protobuf::JoinNode {
                            left: Some(Box::new(left)),
                            right: Some(Box::new(right)),
                            join_type: join_type.into(),
                            join_constraint: join_constraint.into(),
                            left_join_key,
                            right_join_key,
                            null_equals_null: *null_equals_null,
                            filter,
                        },
                    ))),
                })
            }
            LogicalPlan::Subquery(_) => {
                not_impl_err!("LogicalPlan serde is not yet implemented for subqueries")
            }
            LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
                    input.as_ref(),
                    extension_codec,
                )?;
                Ok(LogicalPlanNode {
                    logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
                        protobuf::SubqueryAliasNode {
                            input: Some(Box::new(input)),
                            alias: Some((*alias).clone().into()),
                        },
                    ))),
                })
            }
            LogicalPlan::Limit(limit) => {
                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
                    limit.input.as_ref(),
                    extension_codec,
                )?;
                let SkipType::Literal(skip) = limit.get_skip_type()? else {
                    return Err(proto_error(
                        "LogicalPlan::Limit only supports literal skip values",
                    ));
                };
                let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
                    return Err(proto_error(
                        "LogicalPlan::Limit only supports literal fetch values",
                    ));
                };

                Ok(LogicalPlanNode {
                    logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
                        protobuf::LimitNode {
                            input: Some(Box::new(input)),
                            skip: skip as i64,
                            fetch: fetch.unwrap_or(i64::MAX as usize) as i64,
                        },
                    ))),
                })
            }
            LogicalPlan::Sort(Sort { input, expr, fetch }) => {
                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
                    input.as_ref(),
                    extension_codec,
                )?;
                let sort_expr: Vec<protobuf::SortExprNode> =
                    serialize_sorts(expr, extension_codec)?;
                Ok(LogicalPlanNode {
                    logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
                        protobuf::SortNode {
                            input: Some(Box::new(input)),
                            expr: sort_expr,
                            fetch: fetch.map(|f| f as i64).unwrap_or(-1i64),
                        },
                    ))),
                })
            }
            LogicalPlan::Repartition(Repartition {
                input,
                partitioning_scheme,
            }) => {
                use datafusion::logical_expr::Partitioning;
                let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
                    input.as_ref(),
                    extension_codec,
                )?;

                // Assumed common usize field was batch size
                // Used u64 to avoid any nastyness involving large values, most data clusters are probably uniformly 64 bits any ways
                use protobuf::repartition_node::PartitionMethod;

                let pb_partition_method = match partitioning_scheme {
                    Partitioning::Hash(exprs, partition_count) => {
                        PartitionMethod::Hash(protobuf::HashRepartition {
                            hash_expr: serialize_exprs(exprs, extension_codec)?,
                            partition_count: *partition_count as u64,
                        })
                    }
                    Partitioning::RoundRobinBatch(partition_count) => {
                        PartitionMethod::RoundRobin(*partition_count as u64)
                    }
                    Partitioning::DistributeBy(_) => {
                        return not_impl_err!("DistributeBy")
                    }
                };

                Ok(LogicalPlanNode {
                    logical_plan_type: Some(LogicalPlanType::Repartition(Box::new(
                        protobuf::RepartitionNode {
                            input: Some(Box::new(input)),
                            partition_method: Some(pb_partition_method),
                        },
                    ))),
                })
            }
            LogicalPlan::EmptyRelation(EmptyRelation {
                produce_one_row, ..
            }) => Ok(LogicalPlanNode {
                logical_plan_type: Some(LogicalPlanType::EmptyRelation(
                    protobuf::EmptyRelationNode {
                        produce_one_row: *produce_one_row,
                    },
                )),
            }),
            LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
                CreateExternalTable {
                    name,
                    location,
                    file_type,
                    schema: df_schema,
                    table_partition_cols,
                    if_not_exists,
                    definition,
                    order_exprs,
                    unbounded,
                    options,
                    constraints,
                    column_defaults,
                    temporary,
                },
            )) => {
                let mut converted_order_exprs: Vec<SortExprNodeCollection> = vec![];
                for order in order_exprs {
                    let temp = SortExprNodeCollection {
                        sort_expr_nodes: serialize_sorts(order, extension_codec)?,
                    };
                    converted_order_exprs.push(temp);
                }

                let mut converted_column_defaults =
                    HashMap::with_capacity(column_defaults.len());
                for (col_name, expr) in column_defaults {
                    converted_column_defaults
                        .insert(col_name.clone(), serialize_expr(expr, extension_codec)?);
                }

                Ok(LogicalPlanNode {
                    logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
                        protobuf::CreateExternalTableNode {
                            name: Some(name.clone().into()),
                            location: location.clone(),
                            file_type: file_type.clone(),
                            schema: Some(df_schema.try_into()?),
                            table_partition_cols: table_partition_cols.clone(),
                            if_not_exists: *if_not_exists,
                            temporary: *temporary,
                            order_exprs: converted_order_exprs,
                            definition: definition.clone().unwrap_or_default(),
                            unbounded: *unbounded,
                            options: options.clone(),
                            constraints: Some(constraints.clone().into()),
                            column_defaults: converted_column_defaults,
                        },
                    )),
                })
            }
            LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
                name,
                input,
                or_replace,
                definition,
                temporary,
            })) => Ok(LogicalPlanNode {
                logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
                    protobuf::CreateViewNode {
                        name: Some(name.clone().into()),
                        input: Some(Box::new(LogicalPlanNode::try_from_logical_plan(
                            input,
                            extension_codec,
                        )?)),
                        or_replace: *or_replace,
                        temporary: *temporary,
                        definition: definition.clone().unwrap_or_default(),
                    },
                ))),
            }),
            LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
                CreateCatalogSchema {
                    schema_name,
                    if_not_exists,
                    schema: df_schema,
                },
            )) => Ok(LogicalPlanNode {
                logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema(
                    protobuf::CreateCatalogSchemaNode {
                        schema_name: schema_name.clone(),
                        if_not_exists: *if_not_exists,
                        schema: Some(df_schema.try_into()?),
                    },
                )),
            }),
            LogicalPlan::Ddl(DdlStatement::CreateCatalog(CreateCatalog {
                catalog_name,
                if_not_exists,
                schema: df_schema,
            })) => Ok(LogicalPlanNode {
                logical_plan_type: Some(LogicalPlanType::CreateCatalog(
                    protobuf::CreateCatalogNode {
                        catalog_name: catalog_name.clone(),
                        if_not_exists: *if_not_exists,
                        schema: Some(df_schema.try_into()?),
                    },
                )),
            }),
            LogicalPlan::Analyze(a) => {
                let input = LogicalPlanNode::try_from_logical_plan(
                    a.input.as_ref(),
                    extension_codec,
                )?;
                Ok(LogicalPlanNode {
                    logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
                        protobuf::AnalyzeNode {
                            input: Some(Box::new(input)),
                            verbose: a.verbose,
                        },
                    ))),
                })
            }
            LogicalPlan::Explain(a) => {
                let input = LogicalPlanNode::try_from_logical_plan(
                    a.plan.as_ref(),
                    extension_codec,
                )?;
                Ok(LogicalPlanNode {
                    logical_plan_type: Some(LogicalPlanType::Explain(Box::new(
                        protobuf::ExplainNode {
                            input: Some(Box::new(input)),
                            verbose: a.verbose,
                        },
                    ))),
                })
            }
            LogicalPlan::Union(union) => {
                let inputs: Vec<LogicalPlanNode> = union
                    .inputs
                    .iter()
                    .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
                    .collect::<Result<_>>()?;
                Ok(LogicalPlanNode {
                    logical_plan_type: Some(LogicalPlanType::Union(
                        protobuf::UnionNode { inputs },
                    )),
                })
            }
            LogicalPlan::Extension(extension) => {
                let mut buf: Vec<u8> = vec![];
                extension_codec.try_encode(extension, &mut buf)?;

                let inputs: Vec<LogicalPlanNode> = extension
                    .node
                    .inputs()
                    .iter()
                    .map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
                    .collect::<Result<_>>()?;

                Ok(LogicalPlanNode {
                    logical_plan_type: Some(LogicalPlanType::Extension(
                        LogicalExtensionNode { node: buf, inputs },
                    )),
                })
            }
            LogicalPlan::Statement(Statement::Prepare(Prepare {
                name,
                data_types,
                input,
            })) => {
                let input =
                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
                Ok(LogicalPlanNode {
                    logical_plan_type: Some(LogicalPlanType::Prepare(Box::new(
                        protobuf::PrepareNode {
                            name: name.clone(),
                            data_types: data_types
                                .iter()
                                .map(|t| t.try_into())
                                .collect::<Result<Vec<_>, _>>()?,
                            input: Some(Box::new(input)),
                        },
                    ))),
                })
            }
            LogicalPlan::Unnest(Unnest {
                input,
                exec_columns,
                list_type_columns,
                struct_type_columns,
                dependency_indices,
                schema,
                options,
            }) => {
                let input =
                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
                let proto_unnest_list_items = list_type_columns
                    .iter()
                    .map(|(index, ul)| ColumnUnnestListItem {
                        input_index: *index as _,
                        recursion: Some(ColumnUnnestListRecursion {
                            output_column: Some(ul.output_column.to_owned().into()),
                            depth: ul.depth as _,
                        }),
                    })
                    .collect();
                Ok(LogicalPlanNode {
                    logical_plan_type: Some(LogicalPlanType::Unnest(Box::new(
                        protobuf::UnnestNode {
                            input: Some(Box::new(input)),
                            exec_columns: exec_columns
                                .iter()
                                .map(|col| col.into())
                                .collect(),
                            list_type_columns: proto_unnest_list_items,
                            struct_type_columns: struct_type_columns
                                .iter()
                                .map(|c| *c as u64)
                                .collect(),
                            dependency_indices: dependency_indices
                                .iter()
                                .map(|c| *c as u64)
                                .collect(),
                            schema: Some(schema.try_into()?),
                            options: Some(options.into()),
                        },
                    ))),
                })
            }
            LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error(
                "LogicalPlan serde is not yet implemented for CreateMemoryTable",
            )),
            LogicalPlan::Ddl(DdlStatement::CreateIndex(_)) => Err(proto_error(
                "LogicalPlan serde is not yet implemented for CreateIndex",
            )),
            LogicalPlan::Ddl(DdlStatement::DropTable(_)) => Err(proto_error(
                "LogicalPlan serde is not yet implemented for DropTable",
            )),
            LogicalPlan::Ddl(DdlStatement::DropView(DropView {
                name,
                if_exists,
                schema,
            })) => Ok(LogicalPlanNode {
                logical_plan_type: Some(LogicalPlanType::DropView(
                    protobuf::DropViewNode {
                        name: Some(name.clone().into()),
                        if_exists: *if_exists,
                        schema: Some(schema.try_into()?),
                    },
                )),
            }),
            LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error(
                "LogicalPlan serde is not yet implemented for DropCatalogSchema",
            )),
            LogicalPlan::Ddl(DdlStatement::CreateFunction(_)) => Err(proto_error(
                "LogicalPlan serde is not yet implemented for CreateFunction",
            )),
            LogicalPlan::Ddl(DdlStatement::DropFunction(_)) => Err(proto_error(
                "LogicalPlan serde is not yet implemented for DropFunction",
            )),
            LogicalPlan::Statement(_) => Err(proto_error(
                "LogicalPlan serde is not yet implemented for Statement",
            )),
            LogicalPlan::Dml(DmlStatement {
                table_name,
                target,
                op,
                input,
                ..
            }) => {
                let input =
                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
                let dml_type: dml_node::Type = op.into();
                Ok(LogicalPlanNode {
                    logical_plan_type: Some(LogicalPlanType::Dml(Box::new(DmlNode {
                        input: Some(Box::new(input)),
                        target: Some(Box::new(from_table_source(
                            table_name.clone(),
                            Arc::clone(target),
                            extension_codec,
                        )?)),
                        table_name: Some(table_name.clone().into()),
                        dml_type: dml_type.into(),
                    }))),
                })
            }
            LogicalPlan::Copy(dml::CopyTo {
                input,
                output_url,
                file_type,
                partition_by,
                ..
            }) => {
                let input =
                    LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
                let mut buf = Vec::new();
                extension_codec
                    .try_encode_file_format(&mut buf, file_type_to_format(file_type)?)?;

                Ok(LogicalPlanNode {
                    logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
                        protobuf::CopyToNode {
                            input: Some(Box::new(input)),
                            output_url: output_url.to_string(),
                            file_type: buf,
                            partition_by: partition_by.clone(),
                        },
                    ))),
                })
            }
            LogicalPlan::DescribeTable(_) => Err(proto_error(
                "LogicalPlan serde is not yet implemented for DescribeTable",
            )),
            LogicalPlan::RecursiveQuery(recursive) => {
                let static_term = LogicalPlanNode::try_from_logical_plan(
                    recursive.static_term.as_ref(),
                    extension_codec,
                )?;
                let recursive_term = LogicalPlanNode::try_from_logical_plan(
                    recursive.recursive_term.as_ref(),
                    extension_codec,
                )?;

                Ok(LogicalPlanNode {
                    logical_plan_type: Some(LogicalPlanType::RecursiveQuery(Box::new(
                        protobuf::RecursiveQueryNode {
                            name: recursive.name.clone(),
                            static_term: Some(Box::new(static_term)),
                            recursive_term: Some(Box::new(recursive_term)),
                            is_distinct: recursive.is_distinct,
                        },
                    ))),
                })
            }
        }
    }