in datafusion/proto/src/logical_plan/mod.rs [212:787]
fn try_into_logical_plan(
&self,
ctx: &SessionContext,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<LogicalPlan> {
let plan = self.logical_plan_type.as_ref().ok_or_else(|| {
proto_error(format!(
"logical_plan::from_proto() Unsupported logical plan '{self:?}'"
))
})?;
match plan {
LogicalPlanType::Values(values) => {
let n_cols = values.n_cols as usize;
let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
Ok(Vec::new())
} else if values.values_list.len() % n_cols != 0 {
Err(DataFusionError::Internal(format!(
"Invalid values list length, expect {} to be divisible by {}",
values.values_list.len(),
n_cols
)))
} else {
values
.values_list
.chunks_exact(n_cols)
.map(|r| {
r.iter()
.map(|expr| from_proto::parse_expr(expr, ctx))
.collect::<Result<Vec<_>, from_proto::Error>>()
})
.collect::<Result<Vec<_>, _>>()
.map_err(|e| e.into())
}?;
LogicalPlanBuilder::values(values)?.build()
}
LogicalPlanType::Projection(projection) => {
let input: LogicalPlan =
into_logical_plan!(projection.input, ctx, extension_codec)?;
let expr: Vec<Expr> = projection
.expr
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx))
.collect::<Result<Vec<_>, _>>()?;
let new_proj = project(input, expr)?;
match projection.optional_alias.as_ref() {
Some(a) => match a {
protobuf::projection_node::OptionalAlias::Alias(alias) => {
Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
new_proj,
alias.clone(),
)?))
}
},
_ => Ok(new_proj),
}
}
LogicalPlanType::Selection(selection) => {
let input: LogicalPlan =
into_logical_plan!(selection.input, ctx, extension_codec)?;
let expr: Expr = selection
.expr
.as_ref()
.map(|expr| from_proto::parse_expr(expr, ctx))
.transpose()?
.ok_or_else(|| {
DataFusionError::Internal("expression required".to_string())
})?;
// .try_into()?;
LogicalPlanBuilder::from(input).filter(expr)?.build()
}
LogicalPlanType::Window(window) => {
let input: LogicalPlan =
into_logical_plan!(window.input, ctx, extension_codec)?;
let window_expr = window
.window_expr
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx))
.collect::<Result<Vec<Expr>, _>>()?;
LogicalPlanBuilder::from(input).window(window_expr)?.build()
}
LogicalPlanType::Aggregate(aggregate) => {
let input: LogicalPlan =
into_logical_plan!(aggregate.input, ctx, extension_codec)?;
let group_expr = aggregate
.group_expr
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx))
.collect::<Result<Vec<Expr>, _>>()?;
let aggr_expr = aggregate
.aggr_expr
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx))
.collect::<Result<Vec<Expr>, _>>()?;
LogicalPlanBuilder::from(input)
.aggregate(group_expr, aggr_expr)?
.build()
}
LogicalPlanType::ListingScan(scan) => {
let schema: Schema = convert_required!(scan.schema)?;
let mut projection = None;
if let Some(columns) = &scan.projection {
let column_indices = columns
.columns
.iter()
.map(|name| schema.index_of(name))
.collect::<Result<Vec<usize>, _>>()?;
projection = Some(column_indices);
}
let filters = scan
.filters
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx))
.collect::<Result<Vec<_>, _>>()?;
let mut all_sort_orders = vec![];
for order in &scan.file_sort_order {
let file_sort_order = order
.logical_expr_nodes
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx))
.collect::<Result<Vec<_>, _>>()?;
all_sort_orders.push(file_sort_order)
}
let file_format: Arc<dyn FileFormat> =
match scan.file_format_type.as_ref().ok_or_else(|| {
proto_error(format!(
"logical_plan::from_proto() Unsupported file format '{self:?}'"
))
})? {
&FileFormatType::Parquet(protobuf::ParquetFormat {}) => {
Arc::new(ParquetFormat::default())
}
FileFormatType::Csv(protobuf::CsvFormat {
has_header,
delimiter,
quote,
optional_escape
}) => {
let mut csv = CsvFormat::default()
.with_has_header(*has_header)
.with_delimiter(str_to_byte(delimiter, "delimiter")?)
.with_quote(str_to_byte(quote, "quote")?);
if let Some(protobuf::csv_format::OptionalEscape::Escape(escape)) = optional_escape {
csv = csv.with_quote(str_to_byte(escape, "escape")?);
}
Arc::new(csv)},
FileFormatType::Avro(..) => Arc::new(AvroFormat),
};
let table_paths = &scan
.paths
.iter()
.map(ListingTableUrl::parse)
.collect::<Result<Vec<_>, _>>()?;
let options = ListingOptions::new(file_format)
.with_file_extension(scan.file_extension.clone())
.with_table_partition_cols(
scan.table_partition_cols
.iter()
.map(|col| {
(
col.clone(),
schema
.field_with_name(col)
.unwrap()
.data_type()
.clone(),
)
})
.collect(),
)
.with_collect_stat(scan.collect_stat)
.with_target_partitions(scan.target_partitions as usize)
.with_file_sort_order(all_sort_orders);
let config =
ListingTableConfig::new_with_multi_paths(table_paths.clone())
.with_listing_options(options)
.with_schema(Arc::new(schema));
let provider = ListingTable::try_new(config)?;
let table_name = from_owned_table_reference(
scan.table_name.as_ref(),
"ListingTableScan",
)?;
LogicalPlanBuilder::scan_with_filters(
table_name,
provider_as_source(Arc::new(provider)),
projection,
filters,
)?
.build()
}
LogicalPlanType::CustomScan(scan) => {
let schema: Schema = convert_required!(scan.schema)?;
let schema = Arc::new(schema);
let mut projection = None;
if let Some(columns) = &scan.projection {
let column_indices = columns
.columns
.iter()
.map(|name| schema.index_of(name))
.collect::<Result<Vec<usize>, _>>()?;
projection = Some(column_indices);
}
let filters = scan
.filters
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx))
.collect::<Result<Vec<_>, _>>()?;
let provider = extension_codec.try_decode_table_provider(
&scan.custom_table_data,
schema,
ctx,
)?;
let table_name =
from_owned_table_reference(scan.table_name.as_ref(), "CustomScan")?;
LogicalPlanBuilder::scan_with_filters(
table_name,
provider_as_source(provider),
projection,
filters,
)?
.build()
}
LogicalPlanType::Sort(sort) => {
let input: LogicalPlan =
into_logical_plan!(sort.input, ctx, extension_codec)?;
let sort_expr: Vec<Expr> = sort
.expr
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx))
.collect::<Result<Vec<Expr>, _>>()?;
LogicalPlanBuilder::from(input).sort(sort_expr)?.build()
}
LogicalPlanType::Repartition(repartition) => {
use datafusion::logical_expr::Partitioning;
let input: LogicalPlan =
into_logical_plan!(repartition.input, ctx, extension_codec)?;
use protobuf::repartition_node::PartitionMethod;
let pb_partition_method = repartition.partition_method.clone().ok_or_else(|| {
DataFusionError::Internal(String::from(
"Protobuf deserialization error, RepartitionNode was missing required field 'partition_method'",
))
})?;
let partitioning_scheme = match pb_partition_method {
PartitionMethod::Hash(protobuf::HashRepartition {
hash_expr: pb_hash_expr,
partition_count,
}) => Partitioning::Hash(
pb_hash_expr
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx))
.collect::<Result<Vec<_>, _>>()?,
partition_count as usize,
),
PartitionMethod::RoundRobin(partition_count) => {
Partitioning::RoundRobinBatch(partition_count as usize)
}
};
LogicalPlanBuilder::from(input)
.repartition(partitioning_scheme)?
.build()
}
LogicalPlanType::EmptyRelation(empty_relation) => {
LogicalPlanBuilder::empty(empty_relation.produce_one_row).build()
}
LogicalPlanType::CreateExternalTable(create_extern_table) => {
let pb_schema = (create_extern_table.schema.clone()).ok_or_else(|| {
DataFusionError::Internal(String::from(
"Protobuf deserialization error, CreateExternalTableNode was missing required field schema.",
))
})?;
let definition = if !create_extern_table.definition.is_empty() {
Some(create_extern_table.definition.clone())
} else {
None
};
let file_type = create_extern_table.file_type.as_str();
if ctx.table_factory(file_type).is_none() {
Err(DataFusionError::Internal(format!(
"No TableProviderFactory for file type: {file_type}"
)))?
}
let mut order_exprs = vec![];
for expr in &create_extern_table.order_exprs {
let order_expr = expr
.logical_expr_nodes
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx))
.collect::<Result<Vec<Expr>, _>>()?;
order_exprs.push(order_expr)
}
Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(CreateExternalTable {
schema: pb_schema.try_into()?,
name: from_owned_table_reference(create_extern_table.name.as_ref(), "CreateExternalTable")?,
location: create_extern_table.location.clone(),
file_type: create_extern_table.file_type.clone(),
has_header: create_extern_table.has_header,
delimiter: create_extern_table.delimiter.chars().next().ok_or_else(|| {
DataFusionError::Internal(String::from("Protobuf deserialization error, unable to parse CSV delimiter"))
})?,
table_partition_cols: create_extern_table
.table_partition_cols
.clone(),
order_exprs,
if_not_exists: create_extern_table.if_not_exists,
file_compression_type: CompressionTypeVariant::from_str(&create_extern_table.file_compression_type).map_err(|_| DataFusionError::NotImplemented(format!("Unsupported file compression type {}", create_extern_table.file_compression_type)))?,
definition,
unbounded: create_extern_table.unbounded,
options: create_extern_table.options.clone(),
})))
}
LogicalPlanType::CreateView(create_view) => {
let plan = create_view
.input.clone().ok_or_else(|| DataFusionError::Internal(String::from(
"Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.",
)))?
.try_into_logical_plan(ctx, extension_codec)?;
let definition = if !create_view.definition.is_empty() {
Some(create_view.definition.clone())
} else {
None
};
Ok(LogicalPlan::Ddl(DdlStatement::CreateView(CreateView {
name: from_owned_table_reference(
create_view.name.as_ref(),
"CreateView",
)?,
input: Arc::new(plan),
or_replace: create_view.or_replace,
definition,
})))
}
LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
let pb_schema = (create_catalog_schema.schema.clone()).ok_or_else(|| {
DataFusionError::Internal(String::from(
"Protobuf deserialization error, CreateCatalogSchemaNode was missing required field schema.",
))
})?;
Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalogSchema(
CreateCatalogSchema {
schema_name: create_catalog_schema.schema_name.clone(),
if_not_exists: create_catalog_schema.if_not_exists,
schema: pb_schema.try_into()?,
},
)))
}
LogicalPlanType::CreateCatalog(create_catalog) => {
let pb_schema = (create_catalog.schema.clone()).ok_or_else(|| {
DataFusionError::Internal(String::from(
"Protobuf deserialization error, CreateCatalogNode was missing required field schema.",
))
})?;
Ok(LogicalPlan::Ddl(DdlStatement::CreateCatalog(
CreateCatalog {
catalog_name: create_catalog.catalog_name.clone(),
if_not_exists: create_catalog.if_not_exists,
schema: pb_schema.try_into()?,
},
)))
}
LogicalPlanType::Analyze(analyze) => {
let input: LogicalPlan =
into_logical_plan!(analyze.input, ctx, extension_codec)?;
LogicalPlanBuilder::from(input)
.explain(analyze.verbose, true)?
.build()
}
LogicalPlanType::Explain(explain) => {
let input: LogicalPlan =
into_logical_plan!(explain.input, ctx, extension_codec)?;
LogicalPlanBuilder::from(input)
.explain(explain.verbose, false)?
.build()
}
LogicalPlanType::SubqueryAlias(aliased_relation) => {
let input: LogicalPlan =
into_logical_plan!(aliased_relation.input, ctx, extension_codec)?;
let alias = from_owned_table_reference(
aliased_relation.alias.as_ref(),
"SubqueryAlias",
)?;
LogicalPlanBuilder::from(input).alias(alias)?.build()
}
LogicalPlanType::Limit(limit) => {
let input: LogicalPlan =
into_logical_plan!(limit.input, ctx, extension_codec)?;
let skip = limit.skip.max(0) as usize;
let fetch = if limit.fetch < 0 {
None
} else {
Some(limit.fetch as usize)
};
LogicalPlanBuilder::from(input).limit(skip, fetch)?.build()
}
LogicalPlanType::Join(join) => {
let left_keys: Vec<Expr> = join
.left_join_key
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx))
.collect::<Result<Vec<_>, _>>()?;
let right_keys: Vec<Expr> = join
.right_join_key
.iter()
.map(|expr| from_proto::parse_expr(expr, ctx))
.collect::<Result<Vec<_>, _>>()?;
let join_type =
protobuf::JoinType::from_i32(join.join_type).ok_or_else(|| {
proto_error(format!(
"Received a JoinNode message with unknown JoinType {}",
join.join_type
))
})?;
let join_constraint = protobuf::JoinConstraint::from_i32(
join.join_constraint,
)
.ok_or_else(|| {
proto_error(format!(
"Received a JoinNode message with unknown JoinConstraint {}",
join.join_constraint
))
})?;
let filter: Option<Expr> = join
.filter
.as_ref()
.map(|expr| from_proto::parse_expr(expr, ctx))
.map_or(Ok(None), |v| v.map(Some))?;
let builder = LogicalPlanBuilder::from(into_logical_plan!(
join.left,
ctx,
extension_codec
)?);
let builder = match join_constraint.into() {
JoinConstraint::On => builder.join_with_expr_keys(
into_logical_plan!(join.right, ctx, extension_codec)?,
join_type.into(),
(left_keys, right_keys),
filter,
)?,
JoinConstraint::Using => {
// The equijoin keys in using-join must be column.
let using_keys = left_keys
.into_iter()
.map(|key| key.try_into_col())
.collect::<Result<Vec<_>, _>>()?;
builder.join_using(
into_logical_plan!(join.right, ctx, extension_codec)?,
join_type.into(),
using_keys,
)?
}
};
builder.build()
}
LogicalPlanType::Union(union) => {
let mut input_plans: Vec<LogicalPlan> = union
.inputs
.iter()
.map(|i| i.try_into_logical_plan(ctx, extension_codec))
.collect::<Result<_>>()?;
if input_plans.len() < 2 {
return Err( DataFusionError::Internal(String::from(
"Protobuf deserialization error, Union was require at least two input.",
)));
}
let first = input_plans.pop().ok_or_else(|| DataFusionError::Internal(String::from(
"Protobuf deserialization error, Union was require at least two input.",
)))?;
let mut builder = LogicalPlanBuilder::from(first);
for plan in input_plans {
builder = builder.union(plan)?;
}
builder.build()
}
LogicalPlanType::CrossJoin(crossjoin) => {
let left = into_logical_plan!(crossjoin.left, ctx, extension_codec)?;
let right = into_logical_plan!(crossjoin.right, ctx, extension_codec)?;
LogicalPlanBuilder::from(left).cross_join(right)?.build()
}
LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
let input_plans: Vec<LogicalPlan> = inputs
.iter()
.map(|i| i.try_into_logical_plan(ctx, extension_codec))
.collect::<Result<_>>()?;
let extension_node =
extension_codec.try_decode(node, &input_plans, ctx)?;
Ok(LogicalPlan::Extension(extension_node))
}
LogicalPlanType::Distinct(distinct) => {
let input: LogicalPlan =
into_logical_plan!(distinct.input, ctx, extension_codec)?;
LogicalPlanBuilder::from(input).distinct()?.build()
}
LogicalPlanType::ViewScan(scan) => {
let schema: Schema = convert_required!(scan.schema)?;
let mut projection = None;
if let Some(columns) = &scan.projection {
let column_indices = columns
.columns
.iter()
.map(|name| schema.index_of(name))
.collect::<Result<Vec<usize>, _>>()?;
projection = Some(column_indices);
}
let input: LogicalPlan =
into_logical_plan!(scan.input, ctx, extension_codec)?;
let definition = if !scan.definition.is_empty() {
Some(scan.definition.clone())
} else {
None
};
let provider = ViewTable::try_new(input, definition)?;
let table_name =
from_owned_table_reference(scan.table_name.as_ref(), "ViewScan")?;
LogicalPlanBuilder::scan(
table_name,
provider_as_source(Arc::new(provider)),
projection,
)?
.build()
}
LogicalPlanType::Prepare(prepare) => {
let input: LogicalPlan =
into_logical_plan!(prepare.input, ctx, extension_codec)?;
let data_types: Vec<DataType> = prepare
.data_types
.iter()
.map(DataType::try_from)
.collect::<Result<_, _>>()?;
LogicalPlanBuilder::from(input)
.prepare(prepare.name.clone(), data_types)?
.build()
}
LogicalPlanType::DropView(dropview) => Ok(datafusion_expr::LogicalPlan::Ddl(
datafusion_expr::DdlStatement::DropView(DropView {
name: from_owned_table_reference(dropview.name.as_ref(), "DropView")?,
if_exists: dropview.if_exists,
schema: Arc::new(convert_required!(dropview.schema)?),
}),
)),
}
}