in datafusion/proto/src/logical_plan/mod.rs [1000:1809]
fn try_from_logical_plan(
plan: &LogicalPlan,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<Self>
where
Self: Sized,
{
match plan {
LogicalPlan::Values(Values { values, .. }) => {
let n_cols = if values.is_empty() {
0
} else {
values[0].len()
} as u64;
let values_list =
serialize_exprs(values.iter().flatten(), extension_codec)?;
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Values(
protobuf::ValuesNode {
n_cols,
values_list,
},
)),
})
}
LogicalPlan::TableScan(TableScan {
table_name,
source,
filters,
projection,
..
}) => {
let provider = source_as_provider(source)?;
let schema = provider.schema();
let source = provider.as_any();
let projection = match projection {
None => None,
Some(columns) => {
let column_names = columns
.iter()
.map(|i| schema.field(*i).name().to_owned())
.collect();
Some(protobuf::ProjectionColumns {
columns: column_names,
})
}
};
let filters: Vec<protobuf::LogicalExprNode> =
serialize_exprs(filters, extension_codec)?;
if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
let any = listing_table.options().format.as_any();
let file_format_type = {
let mut maybe_some_type = None;
#[cfg(feature = "parquet")]
if let Some(parquet) = any.downcast_ref::<ParquetFormat>() {
let options = parquet.options();
maybe_some_type =
Some(FileFormatType::Parquet(protobuf::ParquetFormat {
options: Some(options.try_into()?),
}));
};
if let Some(csv) = any.downcast_ref::<CsvFormat>() {
let options = csv.options();
maybe_some_type =
Some(FileFormatType::Csv(protobuf::CsvFormat {
options: Some(options.try_into()?),
}));
}
if let Some(json) = any.downcast_ref::<OtherNdJsonFormat>() {
let options = json.options();
maybe_some_type =
Some(FileFormatType::Json(protobuf::NdJsonFormat {
options: Some(options.try_into()?),
}))
}
#[cfg(feature = "avro")]
if any.is::<AvroFormat>() {
maybe_some_type =
Some(FileFormatType::Avro(protobuf::AvroFormat {}))
}
if let Some(file_format_type) = maybe_some_type {
file_format_type
} else {
return Err(proto_error(format!(
"Error converting file format, {:?} is invalid as a datafusion format.",
listing_table.options().format
)));
}
};
let options = listing_table.options();
let mut builder = SchemaBuilder::from(schema.as_ref());
for (idx, field) in schema.fields().iter().enumerate().rev() {
if options
.table_partition_cols
.iter()
.any(|(name, _)| name == field.name())
{
builder.remove(idx);
}
}
let schema = builder.finish();
let schema: protobuf::Schema = (&schema).try_into()?;
let mut exprs_vec: Vec<SortExprNodeCollection> = vec![];
for order in &options.file_sort_order {
let expr_vec = SortExprNodeCollection {
sort_expr_nodes: serialize_sorts(order, extension_codec)?,
};
exprs_vec.push(expr_vec);
}
let partition_columns = options
.table_partition_cols
.iter()
.map(|(name, arrow_type)| {
let arrow_type = protobuf::ArrowType::try_from(arrow_type)
.map_err(|e| {
proto_error(format!(
"Received an unknown ArrowType: {}",
e
))
})?;
Ok(protobuf::PartitionColumn {
name: name.clone(),
arrow_type: Some(arrow_type),
})
})
.collect::<Result<Vec<_>>>()?;
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::ListingScan(
protobuf::ListingTableScanNode {
file_format_type: Some(file_format_type),
table_name: Some(table_name.clone().into()),
collect_stat: options.collect_stat,
file_extension: options.file_extension.clone(),
table_partition_cols: partition_columns,
paths: listing_table
.table_paths()
.iter()
.map(|x| x.to_string())
.collect(),
schema: Some(schema),
projection,
filters,
target_partitions: options.target_partitions as u32,
file_sort_order: exprs_vec,
},
)),
})
} else if let Some(view_table) = source.downcast_ref::<ViewTable>() {
let schema: protobuf::Schema = schema.as_ref().try_into()?;
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::ViewScan(Box::new(
protobuf::ViewTableScanNode {
table_name: Some(table_name.clone().into()),
input: Some(Box::new(
LogicalPlanNode::try_from_logical_plan(
view_table.logical_plan(),
extension_codec,
)?,
)),
schema: Some(schema),
projection,
definition: view_table
.definition()
.map(|s| s.to_string())
.unwrap_or_default(),
},
))),
})
} else if let Some(cte_work_table) = source.downcast_ref::<CteWorkTable>()
{
let name = cte_work_table.name().to_string();
let schema = cte_work_table.schema();
let schema: protobuf::Schema = schema.as_ref().try_into()?;
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::CteWorkTableScan(
protobuf::CteWorkTableScanNode {
name,
schema: Some(schema),
},
)),
})
} else {
let schema: protobuf::Schema = schema.as_ref().try_into()?;
let mut bytes = vec![];
extension_codec
.try_encode_table_provider(table_name, provider, &mut bytes)
.map_err(|e| context!("Error serializing custom table", e))?;
let scan = CustomScan(CustomTableScanNode {
table_name: Some(table_name.clone().into()),
projection,
schema: Some(schema),
filters,
custom_table_data: bytes,
});
let node = LogicalPlanNode {
logical_plan_type: Some(scan),
};
Ok(node)
}
}
LogicalPlan::Projection(Projection { expr, input, .. }) => {
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
protobuf::ProjectionNode {
input: Some(Box::new(
LogicalPlanNode::try_from_logical_plan(
input.as_ref(),
extension_codec,
)?,
)),
expr: serialize_exprs(expr, extension_codec)?,
optional_alias: None,
},
))),
})
}
LogicalPlan::Filter(filter) => {
let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
filter.input.as_ref(),
extension_codec,
)?;
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
protobuf::SelectionNode {
input: Some(Box::new(input)),
expr: Some(serialize_expr(
&filter.predicate,
extension_codec,
)?),
},
))),
})
}
LogicalPlan::Distinct(Distinct::All(input)) => {
let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
input.as_ref(),
extension_codec,
)?;
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Distinct(Box::new(
protobuf::DistinctNode {
input: Some(Box::new(input)),
},
))),
})
}
LogicalPlan::Distinct(Distinct::On(DistinctOn {
on_expr,
select_expr,
sort_expr,
input,
..
})) => {
let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
input.as_ref(),
extension_codec,
)?;
let sort_expr = match sort_expr {
None => vec![],
Some(sort_expr) => serialize_sorts(sort_expr, extension_codec)?,
};
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::DistinctOn(Box::new(
protobuf::DistinctOnNode {
on_expr: serialize_exprs(on_expr, extension_codec)?,
select_expr: serialize_exprs(select_expr, extension_codec)?,
sort_expr,
input: Some(Box::new(input)),
},
))),
})
}
LogicalPlan::Window(Window {
input, window_expr, ..
}) => {
let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
input.as_ref(),
extension_codec,
)?;
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Window(Box::new(
protobuf::WindowNode {
input: Some(Box::new(input)),
window_expr: serialize_exprs(window_expr, extension_codec)?,
},
))),
})
}
LogicalPlan::Aggregate(Aggregate {
group_expr,
aggr_expr,
input,
..
}) => {
let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
input.as_ref(),
extension_codec,
)?;
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new(
protobuf::AggregateNode {
input: Some(Box::new(input)),
group_expr: serialize_exprs(group_expr, extension_codec)?,
aggr_expr: serialize_exprs(aggr_expr, extension_codec)?,
},
))),
})
}
LogicalPlan::Join(Join {
left,
right,
on,
filter,
join_type,
join_constraint,
null_equals_null,
..
}) => {
let left: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
left.as_ref(),
extension_codec,
)?;
let right: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
right.as_ref(),
extension_codec,
)?;
let (left_join_key, right_join_key) = on
.iter()
.map(|(l, r)| {
Ok((
serialize_expr(l, extension_codec)?,
serialize_expr(r, extension_codec)?,
))
})
.collect::<Result<Vec<_>, ToProtoError>>()?
.into_iter()
.unzip();
let join_type: protobuf::JoinType = join_type.to_owned().into();
let join_constraint: protobuf::JoinConstraint =
join_constraint.to_owned().into();
let filter = filter
.as_ref()
.map(|e| serialize_expr(e, extension_codec))
.map_or(Ok(None), |v| v.map(Some))?;
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Join(Box::new(
protobuf::JoinNode {
left: Some(Box::new(left)),
right: Some(Box::new(right)),
join_type: join_type.into(),
join_constraint: join_constraint.into(),
left_join_key,
right_join_key,
null_equals_null: *null_equals_null,
filter,
},
))),
})
}
LogicalPlan::Subquery(_) => {
not_impl_err!("LogicalPlan serde is not yet implemented for subqueries")
}
LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
input.as_ref(),
extension_codec,
)?;
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
protobuf::SubqueryAliasNode {
input: Some(Box::new(input)),
alias: Some((*alias).clone().into()),
},
))),
})
}
LogicalPlan::Limit(limit) => {
let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
limit.input.as_ref(),
extension_codec,
)?;
let SkipType::Literal(skip) = limit.get_skip_type()? else {
return Err(proto_error(
"LogicalPlan::Limit only supports literal skip values",
));
};
let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
return Err(proto_error(
"LogicalPlan::Limit only supports literal fetch values",
));
};
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
protobuf::LimitNode {
input: Some(Box::new(input)),
skip: skip as i64,
fetch: fetch.unwrap_or(i64::MAX as usize) as i64,
},
))),
})
}
LogicalPlan::Sort(Sort { input, expr, fetch }) => {
let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
input.as_ref(),
extension_codec,
)?;
let sort_expr: Vec<protobuf::SortExprNode> =
serialize_sorts(expr, extension_codec)?;
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
protobuf::SortNode {
input: Some(Box::new(input)),
expr: sort_expr,
fetch: fetch.map(|f| f as i64).unwrap_or(-1i64),
},
))),
})
}
LogicalPlan::Repartition(Repartition {
input,
partitioning_scheme,
}) => {
use datafusion::logical_expr::Partitioning;
let input: LogicalPlanNode = LogicalPlanNode::try_from_logical_plan(
input.as_ref(),
extension_codec,
)?;
// Assumed common usize field was batch size
// Used u64 to avoid any nastyness involving large values, most data clusters are probably uniformly 64 bits any ways
use protobuf::repartition_node::PartitionMethod;
let pb_partition_method = match partitioning_scheme {
Partitioning::Hash(exprs, partition_count) => {
PartitionMethod::Hash(protobuf::HashRepartition {
hash_expr: serialize_exprs(exprs, extension_codec)?,
partition_count: *partition_count as u64,
})
}
Partitioning::RoundRobinBatch(partition_count) => {
PartitionMethod::RoundRobin(*partition_count as u64)
}
Partitioning::DistributeBy(_) => {
return not_impl_err!("DistributeBy")
}
};
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Repartition(Box::new(
protobuf::RepartitionNode {
input: Some(Box::new(input)),
partition_method: Some(pb_partition_method),
},
))),
})
}
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row, ..
}) => Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::EmptyRelation(
protobuf::EmptyRelationNode {
produce_one_row: *produce_one_row,
},
)),
}),
LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
CreateExternalTable {
name,
location,
file_type,
schema: df_schema,
table_partition_cols,
if_not_exists,
definition,
order_exprs,
unbounded,
options,
constraints,
column_defaults,
temporary,
},
)) => {
let mut converted_order_exprs: Vec<SortExprNodeCollection> = vec![];
for order in order_exprs {
let temp = SortExprNodeCollection {
sort_expr_nodes: serialize_sorts(order, extension_codec)?,
};
converted_order_exprs.push(temp);
}
let mut converted_column_defaults =
HashMap::with_capacity(column_defaults.len());
for (col_name, expr) in column_defaults {
converted_column_defaults
.insert(col_name.clone(), serialize_expr(expr, extension_codec)?);
}
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
protobuf::CreateExternalTableNode {
name: Some(name.clone().into()),
location: location.clone(),
file_type: file_type.clone(),
schema: Some(df_schema.try_into()?),
table_partition_cols: table_partition_cols.clone(),
if_not_exists: *if_not_exists,
temporary: *temporary,
order_exprs: converted_order_exprs,
definition: definition.clone().unwrap_or_default(),
unbounded: *unbounded,
options: options.clone(),
constraints: Some(constraints.clone().into()),
column_defaults: converted_column_defaults,
},
)),
})
}
LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
name,
input,
or_replace,
definition,
temporary,
})) => Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
protobuf::CreateViewNode {
name: Some(name.clone().into()),
input: Some(Box::new(LogicalPlanNode::try_from_logical_plan(
input,
extension_codec,
)?)),
or_replace: *or_replace,
temporary: *temporary,
definition: definition.clone().unwrap_or_default(),
},
))),
}),
LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
CreateCatalogSchema {
schema_name,
if_not_exists,
schema: df_schema,
},
)) => Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema(
protobuf::CreateCatalogSchemaNode {
schema_name: schema_name.clone(),
if_not_exists: *if_not_exists,
schema: Some(df_schema.try_into()?),
},
)),
}),
LogicalPlan::Ddl(DdlStatement::CreateCatalog(CreateCatalog {
catalog_name,
if_not_exists,
schema: df_schema,
})) => Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::CreateCatalog(
protobuf::CreateCatalogNode {
catalog_name: catalog_name.clone(),
if_not_exists: *if_not_exists,
schema: Some(df_schema.try_into()?),
},
)),
}),
LogicalPlan::Analyze(a) => {
let input = LogicalPlanNode::try_from_logical_plan(
a.input.as_ref(),
extension_codec,
)?;
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
protobuf::AnalyzeNode {
input: Some(Box::new(input)),
verbose: a.verbose,
},
))),
})
}
LogicalPlan::Explain(a) => {
let input = LogicalPlanNode::try_from_logical_plan(
a.plan.as_ref(),
extension_codec,
)?;
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Explain(Box::new(
protobuf::ExplainNode {
input: Some(Box::new(input)),
verbose: a.verbose,
},
))),
})
}
LogicalPlan::Union(union) => {
let inputs: Vec<LogicalPlanNode> = union
.inputs
.iter()
.map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
.collect::<Result<_>>()?;
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Union(
protobuf::UnionNode { inputs },
)),
})
}
LogicalPlan::Extension(extension) => {
let mut buf: Vec<u8> = vec![];
extension_codec.try_encode(extension, &mut buf)?;
let inputs: Vec<LogicalPlanNode> = extension
.node
.inputs()
.iter()
.map(|i| LogicalPlanNode::try_from_logical_plan(i, extension_codec))
.collect::<Result<_>>()?;
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Extension(
LogicalExtensionNode { node: buf, inputs },
)),
})
}
LogicalPlan::Statement(Statement::Prepare(Prepare {
name,
data_types,
input,
})) => {
let input =
LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Prepare(Box::new(
protobuf::PrepareNode {
name: name.clone(),
data_types: data_types
.iter()
.map(|t| t.try_into())
.collect::<Result<Vec<_>, _>>()?,
input: Some(Box::new(input)),
},
))),
})
}
LogicalPlan::Unnest(Unnest {
input,
exec_columns,
list_type_columns,
struct_type_columns,
dependency_indices,
schema,
options,
}) => {
let input =
LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
let proto_unnest_list_items = list_type_columns
.iter()
.map(|(index, ul)| ColumnUnnestListItem {
input_index: *index as _,
recursion: Some(ColumnUnnestListRecursion {
output_column: Some(ul.output_column.to_owned().into()),
depth: ul.depth as _,
}),
})
.collect();
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Unnest(Box::new(
protobuf::UnnestNode {
input: Some(Box::new(input)),
exec_columns: exec_columns
.iter()
.map(|col| col.into())
.collect(),
list_type_columns: proto_unnest_list_items,
struct_type_columns: struct_type_columns
.iter()
.map(|c| *c as u64)
.collect(),
dependency_indices: dependency_indices
.iter()
.map(|c| *c as u64)
.collect(),
schema: Some(schema.try_into()?),
options: Some(options.into()),
},
))),
})
}
LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error(
"LogicalPlan serde is not yet implemented for CreateMemoryTable",
)),
LogicalPlan::Ddl(DdlStatement::CreateIndex(_)) => Err(proto_error(
"LogicalPlan serde is not yet implemented for CreateIndex",
)),
LogicalPlan::Ddl(DdlStatement::DropTable(_)) => Err(proto_error(
"LogicalPlan serde is not yet implemented for DropTable",
)),
LogicalPlan::Ddl(DdlStatement::DropView(DropView {
name,
if_exists,
schema,
})) => Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::DropView(
protobuf::DropViewNode {
name: Some(name.clone().into()),
if_exists: *if_exists,
schema: Some(schema.try_into()?),
},
)),
}),
LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error(
"LogicalPlan serde is not yet implemented for DropCatalogSchema",
)),
LogicalPlan::Ddl(DdlStatement::CreateFunction(_)) => Err(proto_error(
"LogicalPlan serde is not yet implemented for CreateFunction",
)),
LogicalPlan::Ddl(DdlStatement::DropFunction(_)) => Err(proto_error(
"LogicalPlan serde is not yet implemented for DropFunction",
)),
LogicalPlan::Statement(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for Statement",
)),
LogicalPlan::Dml(DmlStatement {
table_name,
target,
op,
input,
..
}) => {
let input =
LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
let dml_type: dml_node::Type = op.into();
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Dml(Box::new(DmlNode {
input: Some(Box::new(input)),
target: Some(Box::new(from_table_source(
table_name.clone(),
Arc::clone(target),
extension_codec,
)?)),
table_name: Some(table_name.clone().into()),
dml_type: dml_type.into(),
}))),
})
}
LogicalPlan::Copy(dml::CopyTo {
input,
output_url,
file_type,
partition_by,
..
}) => {
let input =
LogicalPlanNode::try_from_logical_plan(input, extension_codec)?;
let mut buf = Vec::new();
extension_codec
.try_encode_file_format(&mut buf, file_type_to_format(file_type)?)?;
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
protobuf::CopyToNode {
input: Some(Box::new(input)),
output_url: output_url.to_string(),
file_type: buf,
partition_by: partition_by.clone(),
},
))),
})
}
LogicalPlan::DescribeTable(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for DescribeTable",
)),
LogicalPlan::RecursiveQuery(recursive) => {
let static_term = LogicalPlanNode::try_from_logical_plan(
recursive.static_term.as_ref(),
extension_codec,
)?;
let recursive_term = LogicalPlanNode::try_from_logical_plan(
recursive.recursive_term.as_ref(),
extension_codec,
)?;
Ok(LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::RecursiveQuery(Box::new(
protobuf::RecursiveQueryNode {
name: recursive.name.clone(),
static_term: Some(Box::new(static_term)),
recursive_term: Some(Box::new(recursive_term)),
is_distinct: recursive.is_distinct,
},
))),
})
}
}
}