in datafusion/proto/src/logical_plan/mod.rs [789:1436]
fn try_from_logical_plan(
plan: &LogicalPlan,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<Self>
where
Self: Sized,
{
match plan {
LogicalPlan::Values(Values { values, .. }) => {
let n_cols = if values.is_empty() {
0
} else {
values[0].len()
} as u64;
let values_list = values
.iter()
.flatten()
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Values(
protobuf::ValuesNode {
n_cols,
values_list,
},
)),
})
}
LogicalPlan::TableScan(TableScan {
table_name,
source,
filters,
projection,
..
}) => {
let provider = source_as_provider(source)?;
let schema = provider.schema();
let source = provider.as_any();
let projection = match projection {
None => None,
Some(columns) => {
let column_names = columns
.iter()
.map(|i| schema.field(*i).name().to_owned())
.collect();
Some(protobuf::ProjectionColumns {
columns: column_names,
})
}
};
let schema: protobuf::Schema = schema.as_ref().try_into()?;
let filters: Vec<protobuf::LogicalExprNode> = filters
.iter()
.map(|filter| filter.try_into())
.collect::<Result<Vec<_>, _>>()?;
if let Some(listing_table) = source.downcast_ref::<ListingTable>() {
let any = listing_table.options().format.as_any();
let file_format_type = if any.is::<ParquetFormat>() {
FileFormatType::Parquet(protobuf::ParquetFormat {})
} else if let Some(csv) = any.downcast_ref::<CsvFormat>() {
FileFormatType::Csv(protobuf::CsvFormat {
delimiter: byte_to_string(csv.delimiter(), "delimiter")?,
has_header: csv.has_header(),
quote: byte_to_string(csv.quote(), "quote")?,
optional_escape: if let Some(escape) = csv.escape() {
Some(protobuf::csv_format::OptionalEscape::Escape(
byte_to_string(escape, "escape")?,
))
} else {
None
},
})
} else if any.is::<AvroFormat>() {
FileFormatType::Avro(protobuf::AvroFormat {})
} else {
return Err(proto_error(format!(
"Error converting file format, {:?} is invalid as a datafusion format.",
listing_table.options().format
)));
};
let options = listing_table.options();
let mut exprs_vec: Vec<LogicalExprNodeCollection> = vec![];
for order in &options.file_sort_order {
let expr_vec = LogicalExprNodeCollection {
logical_expr_nodes: order
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, to_proto::Error>>()?,
};
exprs_vec.push(expr_vec);
}
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::ListingScan(
protobuf::ListingTableScanNode {
file_format_type: Some(file_format_type),
table_name: Some(table_name.clone().into()),
collect_stat: options.collect_stat,
file_extension: options.file_extension.clone(),
table_partition_cols: options
.table_partition_cols
.iter()
.map(|x| x.0.clone())
.collect::<Vec<_>>(),
paths: listing_table
.table_paths()
.iter()
.map(|x| x.to_string())
.collect(),
schema: Some(schema),
projection,
filters,
target_partitions: options.target_partitions as u32,
file_sort_order: exprs_vec,
},
)),
})
} else if let Some(view_table) = source.downcast_ref::<ViewTable>() {
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::ViewScan(Box::new(
protobuf::ViewTableScanNode {
table_name: Some(table_name.clone().into()),
input: Some(Box::new(
protobuf::LogicalPlanNode::try_from_logical_plan(
view_table.logical_plan(),
extension_codec,
)?,
)),
schema: Some(schema),
projection,
definition: view_table
.definition()
.map(|s| s.to_string())
.unwrap_or_default(),
},
))),
})
} else {
let mut bytes = vec![];
extension_codec
.try_encode_table_provider(provider, &mut bytes)
.map_err(|e| context!("Error serializing custom table", e))?;
let scan = CustomScan(CustomTableScanNode {
table_name: Some(table_name.clone().into()),
projection,
schema: Some(schema),
filters,
custom_table_data: bytes,
});
let node = LogicalPlanNode {
logical_plan_type: Some(scan),
};
Ok(node)
}
}
LogicalPlan::Projection(Projection { expr, input, .. }) => {
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Projection(Box::new(
protobuf::ProjectionNode {
input: Some(Box::new(
protobuf::LogicalPlanNode::try_from_logical_plan(
input.as_ref(),
extension_codec,
)?,
)),
expr: expr
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, to_proto::Error>>()?,
optional_alias: None,
},
))),
})
}
LogicalPlan::Filter(filter) => {
let input: protobuf::LogicalPlanNode =
protobuf::LogicalPlanNode::try_from_logical_plan(
filter.input.as_ref(),
extension_codec,
)?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Selection(Box::new(
protobuf::SelectionNode {
input: Some(Box::new(input)),
expr: Some((&filter.predicate).try_into()?),
},
))),
})
}
LogicalPlan::Distinct(Distinct { input }) => {
let input: protobuf::LogicalPlanNode =
protobuf::LogicalPlanNode::try_from_logical_plan(
input.as_ref(),
extension_codec,
)?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Distinct(Box::new(
protobuf::DistinctNode {
input: Some(Box::new(input)),
},
))),
})
}
LogicalPlan::Window(Window {
input, window_expr, ..
}) => {
let input: protobuf::LogicalPlanNode =
protobuf::LogicalPlanNode::try_from_logical_plan(
input.as_ref(),
extension_codec,
)?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Window(Box::new(
protobuf::WindowNode {
input: Some(Box::new(input)),
window_expr: window_expr
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, _>>()?,
},
))),
})
}
LogicalPlan::Aggregate(Aggregate {
group_expr,
aggr_expr,
input,
..
}) => {
let input: protobuf::LogicalPlanNode =
protobuf::LogicalPlanNode::try_from_logical_plan(
input.as_ref(),
extension_codec,
)?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Aggregate(Box::new(
protobuf::AggregateNode {
input: Some(Box::new(input)),
group_expr: group_expr
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, _>>()?,
aggr_expr: aggr_expr
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, _>>()?,
},
))),
})
}
LogicalPlan::Join(Join {
left,
right,
on,
filter,
join_type,
join_constraint,
null_equals_null,
..
}) => {
let left: protobuf::LogicalPlanNode =
protobuf::LogicalPlanNode::try_from_logical_plan(
left.as_ref(),
extension_codec,
)?;
let right: protobuf::LogicalPlanNode =
protobuf::LogicalPlanNode::try_from_logical_plan(
right.as_ref(),
extension_codec,
)?;
let (left_join_key, right_join_key) = on
.iter()
.map(|(l, r)| Ok((l.try_into()?, r.try_into()?)))
.collect::<Result<Vec<_>, to_proto::Error>>()?
.into_iter()
.unzip();
let join_type: protobuf::JoinType = join_type.to_owned().into();
let join_constraint: protobuf::JoinConstraint =
join_constraint.to_owned().into();
let filter = filter
.as_ref()
.map(|e| e.try_into())
.map_or(Ok(None), |v| v.map(Some))?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Join(Box::new(
protobuf::JoinNode {
left: Some(Box::new(left)),
right: Some(Box::new(right)),
join_type: join_type.into(),
join_constraint: join_constraint.into(),
left_join_key,
right_join_key,
null_equals_null: *null_equals_null,
filter,
},
))),
})
}
LogicalPlan::Subquery(_) => Err(DataFusionError::NotImplemented(
"LogicalPlan serde is not yet implemented for subqueries".to_string(),
)),
LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
let input: protobuf::LogicalPlanNode =
protobuf::LogicalPlanNode::try_from_logical_plan(
input.as_ref(),
extension_codec,
)?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::SubqueryAlias(Box::new(
protobuf::SubqueryAliasNode {
input: Some(Box::new(input)),
alias: Some(alias.to_owned_reference().into()),
},
))),
})
}
LogicalPlan::Limit(Limit { input, skip, fetch }) => {
let input: protobuf::LogicalPlanNode =
protobuf::LogicalPlanNode::try_from_logical_plan(
input.as_ref(),
extension_codec,
)?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Limit(Box::new(
protobuf::LimitNode {
input: Some(Box::new(input)),
skip: *skip as i64,
fetch: fetch.unwrap_or(i64::MAX as usize) as i64,
},
))),
})
}
LogicalPlan::Sort(Sort { input, expr, fetch }) => {
let input: protobuf::LogicalPlanNode =
protobuf::LogicalPlanNode::try_from_logical_plan(
input.as_ref(),
extension_codec,
)?;
let selection_expr: Vec<protobuf::LogicalExprNode> = expr
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, to_proto::Error>>()?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Sort(Box::new(
protobuf::SortNode {
input: Some(Box::new(input)),
expr: selection_expr,
fetch: fetch.map(|f| f as i64).unwrap_or(-1i64),
},
))),
})
}
LogicalPlan::Repartition(Repartition {
input,
partitioning_scheme,
}) => {
use datafusion::logical_expr::Partitioning;
let input: protobuf::LogicalPlanNode =
protobuf::LogicalPlanNode::try_from_logical_plan(
input.as_ref(),
extension_codec,
)?;
// Assumed common usize field was batch size
// Used u64 to avoid any nastyness involving large values, most data clusters are probably uniformly 64 bits any ways
use protobuf::repartition_node::PartitionMethod;
let pb_partition_method = match partitioning_scheme {
Partitioning::Hash(exprs, partition_count) => {
PartitionMethod::Hash(protobuf::HashRepartition {
hash_expr: exprs
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, to_proto::Error>>()?,
partition_count: *partition_count as u64,
})
}
Partitioning::RoundRobinBatch(partition_count) => {
PartitionMethod::RoundRobin(*partition_count as u64)
}
Partitioning::DistributeBy(_) => {
return Err(DataFusionError::NotImplemented(
"DistributeBy".to_string(),
))
}
};
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Repartition(Box::new(
protobuf::RepartitionNode {
input: Some(Box::new(input)),
partition_method: Some(pb_partition_method),
},
))),
})
}
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row, ..
}) => Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::EmptyRelation(
protobuf::EmptyRelationNode {
produce_one_row: *produce_one_row,
},
)),
}),
LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
CreateExternalTable {
name,
location,
file_type,
has_header,
delimiter,
schema: df_schema,
table_partition_cols,
if_not_exists,
definition,
file_compression_type,
order_exprs,
unbounded,
options,
},
)) => {
let mut converted_order_exprs: Vec<LogicalExprNodeCollection> = vec![];
for order in order_exprs {
let temp = LogicalExprNodeCollection {
logical_expr_nodes: order
.iter()
.map(|expr| expr.try_into())
.collect::<Result<Vec<_>, to_proto::Error>>(
)?,
};
converted_order_exprs.push(temp);
}
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
protobuf::CreateExternalTableNode {
name: Some(name.clone().into()),
location: location.clone(),
file_type: file_type.clone(),
has_header: *has_header,
schema: Some(df_schema.try_into()?),
table_partition_cols: table_partition_cols.clone(),
if_not_exists: *if_not_exists,
delimiter: String::from(*delimiter),
order_exprs: converted_order_exprs,
definition: definition.clone().unwrap_or_default(),
file_compression_type: file_compression_type.to_string(),
unbounded: *unbounded,
options: options.clone(),
},
)),
})
}
LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
name,
input,
or_replace,
definition,
})) => Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
protobuf::CreateViewNode {
name: Some(name.clone().into()),
input: Some(Box::new(LogicalPlanNode::try_from_logical_plan(
input,
extension_codec,
)?)),
or_replace: *or_replace,
definition: definition.clone().unwrap_or_default(),
},
))),
}),
LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
CreateCatalogSchema {
schema_name,
if_not_exists,
schema: df_schema,
},
)) => Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema(
protobuf::CreateCatalogSchemaNode {
schema_name: schema_name.clone(),
if_not_exists: *if_not_exists,
schema: Some(df_schema.try_into()?),
},
)),
}),
LogicalPlan::Ddl(DdlStatement::CreateCatalog(CreateCatalog {
catalog_name,
if_not_exists,
schema: df_schema,
})) => Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::CreateCatalog(
protobuf::CreateCatalogNode {
catalog_name: catalog_name.clone(),
if_not_exists: *if_not_exists,
schema: Some(df_schema.try_into()?),
},
)),
}),
LogicalPlan::Analyze(a) => {
let input = protobuf::LogicalPlanNode::try_from_logical_plan(
a.input.as_ref(),
extension_codec,
)?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Analyze(Box::new(
protobuf::AnalyzeNode {
input: Some(Box::new(input)),
verbose: a.verbose,
},
))),
})
}
LogicalPlan::Explain(a) => {
let input = protobuf::LogicalPlanNode::try_from_logical_plan(
a.plan.as_ref(),
extension_codec,
)?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Explain(Box::new(
protobuf::ExplainNode {
input: Some(Box::new(input)),
verbose: a.verbose,
},
))),
})
}
LogicalPlan::Union(union) => {
let inputs: Vec<LogicalPlanNode> = union
.inputs
.iter()
.map(|i| {
protobuf::LogicalPlanNode::try_from_logical_plan(
i,
extension_codec,
)
})
.collect::<Result<_>>()?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Union(
protobuf::UnionNode { inputs },
)),
})
}
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
let left = protobuf::LogicalPlanNode::try_from_logical_plan(
left.as_ref(),
extension_codec,
)?;
let right = protobuf::LogicalPlanNode::try_from_logical_plan(
right.as_ref(),
extension_codec,
)?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::CrossJoin(Box::new(
protobuf::CrossJoinNode {
left: Some(Box::new(left)),
right: Some(Box::new(right)),
},
))),
})
}
LogicalPlan::Extension(extension) => {
let mut buf: Vec<u8> = vec![];
extension_codec.try_encode(extension, &mut buf)?;
let inputs: Vec<LogicalPlanNode> = extension
.node
.inputs()
.iter()
.map(|i| {
protobuf::LogicalPlanNode::try_from_logical_plan(
i,
extension_codec,
)
})
.collect::<Result<_>>()?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Extension(
LogicalExtensionNode { node: buf, inputs },
)),
})
}
LogicalPlan::Prepare(Prepare {
name,
data_types,
input,
}) => {
let input = protobuf::LogicalPlanNode::try_from_logical_plan(
input,
extension_codec,
)?;
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Prepare(Box::new(
protobuf::PrepareNode {
name: name.clone(),
data_types: data_types
.iter()
.map(|t| t.try_into())
.collect::<Result<Vec<_>, _>>()?,
input: Some(Box::new(input)),
},
))),
})
}
LogicalPlan::Unnest(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for Unnest",
)),
LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error(
"LogicalPlan serde is not yet implemented for CreateMemoryTable",
)),
LogicalPlan::Ddl(DdlStatement::DropTable(_)) => Err(proto_error(
"LogicalPlan serde is not yet implemented for DropTable",
)),
LogicalPlan::Ddl(DdlStatement::DropView(DropView {
name,
if_exists,
schema,
})) => Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::DropView(
protobuf::DropViewNode {
name: Some(name.clone().into()),
if_exists: *if_exists,
schema: Some(schema.try_into()?),
},
)),
}),
LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error(
"LogicalPlan serde is not yet implemented for DropCatalogSchema",
)),
LogicalPlan::Statement(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for Statement",
)),
LogicalPlan::Dml(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for Dml",
)),
LogicalPlan::DescribeTable(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for DescribeTable",
)),
}
}