in datafusion/sql/src/unparser/plan.rs [327:979]
fn select_to_sql_recursively(
&self,
plan: &LogicalPlan,
query: &mut Option<QueryBuilder>,
select: &mut SelectBuilder,
relation: &mut RelationBuilder,
) -> Result<()> {
match plan {
LogicalPlan::TableScan(scan) => {
if let Some(unparsed_table_scan) = Self::unparse_table_scan_pushdown(
plan,
None,
select.already_projected(),
)? {
return self.select_to_sql_recursively(
&unparsed_table_scan,
query,
select,
relation,
);
}
let mut builder = TableRelationBuilder::default();
let mut table_parts = vec![];
if let Some(catalog_name) = scan.table_name.catalog() {
table_parts
.push(self.new_ident_quoted_if_needs(catalog_name.to_string()));
}
if let Some(schema_name) = scan.table_name.schema() {
table_parts
.push(self.new_ident_quoted_if_needs(schema_name.to_string()));
}
table_parts.push(
self.new_ident_quoted_if_needs(scan.table_name.table().to_string()),
);
builder.name(ast::ObjectName::from(table_parts));
relation.table(builder);
Ok(())
}
LogicalPlan::Projection(p) => {
if let Some(new_plan) = rewrite_plan_for_sort_on_non_projected_fields(p) {
return self
.select_to_sql_recursively(&new_plan, query, select, relation);
}
// Projection can be top-level plan for unnest relation
// The projection generated by the `RecursiveUnnestRewriter` from a UNNEST relation will have
// only one expression, which is the placeholder column generated by the rewriter.
let unnest_input_type = if p.expr.len() == 1 {
Self::check_unnest_placeholder_with_outer_ref(&p.expr[0])
} else {
None
};
if self.dialect.unnest_as_table_factor() && unnest_input_type.is_some() {
if let LogicalPlan::Unnest(unnest) = &p.input.as_ref() {
if let Some(unnest_relation) =
self.try_unnest_to_table_factor_sql(unnest)?
{
relation.unnest(unnest_relation);
return self.select_to_sql_recursively(
p.input.as_ref(),
query,
select,
relation,
);
}
}
}
// If it's a unnest projection, we should provide the table column alias
// to provide a column name for the unnest relation.
let columns = if unnest_input_type.is_some() {
p.expr
.iter()
.map(|e| {
self.new_ident_quoted_if_needs(e.schema_name().to_string())
})
.collect()
} else {
vec![]
};
// Projection can be top-level plan for derived table
if select.already_projected() {
return self.derive_with_dialect_alias(
"derived_projection",
plan,
relation,
unnest_input_type
.filter(|t| matches!(t, UnnestInputType::OuterReference))
.is_some(),
columns,
);
}
self.reconstruct_select_statement(plan, p, select)?;
self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
}
LogicalPlan::Filter(filter) => {
if let Some(agg) =
find_agg_node_within_select(plan, select.already_projected())
{
let unprojected =
unproject_agg_exprs(filter.predicate.clone(), agg, None)?;
let filter_expr = self.expr_to_sql(&unprojected)?;
select.having(Some(filter_expr));
} else {
let filter_expr = self.expr_to_sql(&filter.predicate)?;
select.selection(Some(filter_expr));
}
self.select_to_sql_recursively(
filter.input.as_ref(),
query,
select,
relation,
)
}
LogicalPlan::Limit(limit) => {
// Limit can be top-level plan for derived table
if select.already_projected() {
return self.derive_with_dialect_alias(
"derived_limit",
plan,
relation,
false,
vec![],
);
}
if let Some(fetch) = &limit.fetch {
let Some(query) = query.as_mut() else {
return internal_err!(
"Limit operator only valid in a statement context."
);
};
query.limit(Some(self.expr_to_sql(fetch)?));
}
if let Some(skip) = &limit.skip {
let Some(query) = query.as_mut() else {
return internal_err!(
"Offset operator only valid in a statement context."
);
};
query.offset(Some(ast::Offset {
rows: ast::OffsetRows::None,
value: self.expr_to_sql(skip)?,
}));
}
self.select_to_sql_recursively(
limit.input.as_ref(),
query,
select,
relation,
)
}
LogicalPlan::Sort(sort) => {
// Sort can be top-level plan for derived table
if select.already_projected() {
return self.derive_with_dialect_alias(
"derived_sort",
plan,
relation,
false,
vec![],
);
}
let Some(query_ref) = query else {
return internal_err!(
"Sort operator only valid in a statement context."
);
};
if let Some(fetch) = sort.fetch {
query_ref.limit(Some(ast::Expr::value(ast::Value::Number(
fetch.to_string(),
false,
))));
};
let agg = find_agg_node_within_select(plan, select.already_projected());
// unproject sort expressions
let sort_exprs: Vec<SortExpr> = sort
.expr
.iter()
.map(|sort_expr| {
unproject_sort_expr(sort_expr, agg, sort.input.as_ref())
})
.collect::<Result<Vec<_>>>()?;
query_ref.order_by(self.sorts_to_sql(&sort_exprs)?);
self.select_to_sql_recursively(
sort.input.as_ref(),
query,
select,
relation,
)
}
LogicalPlan::Aggregate(agg) => {
// Aggregation can be already handled in the projection case
if !select.already_projected() {
// The query returns aggregate and group expressions. If that weren't the case,
// the aggregate would have been placed inside a projection, making the check above^ false
let exprs: Vec<_> = agg
.aggr_expr
.iter()
.chain(agg.group_expr.iter())
.map(|expr| self.select_item_to_sql(expr))
.collect::<Result<Vec<_>>>()?;
select.projection(exprs);
select.group_by(ast::GroupByExpr::Expressions(
agg.group_expr
.iter()
.map(|expr| self.expr_to_sql(expr))
.collect::<Result<Vec<_>>>()?,
vec![],
));
}
self.select_to_sql_recursively(
agg.input.as_ref(),
query,
select,
relation,
)
}
LogicalPlan::Distinct(distinct) => {
// Distinct can be top-level plan for derived table
if select.already_projected() {
return self.derive_with_dialect_alias(
"derived_distinct",
plan,
relation,
false,
vec![],
);
}
// If this distinct is the parent of a Union and we're in a query context,
// then we need to unparse as a `UNION` rather than a `UNION ALL`.
if let Distinct::All(input) = distinct {
if matches!(input.as_ref(), LogicalPlan::Union(_)) {
if let Some(query_mut) = query.as_mut() {
query_mut.distinct_union();
return self.select_to_sql_recursively(
input.as_ref(),
query,
select,
relation,
);
}
}
}
let (select_distinct, input) = match distinct {
Distinct::All(input) => (ast::Distinct::Distinct, input.as_ref()),
Distinct::On(on) => {
let exprs = on
.on_expr
.iter()
.map(|e| self.expr_to_sql(e))
.collect::<Result<Vec<_>>>()?;
let items = on
.select_expr
.iter()
.map(|e| self.select_item_to_sql(e))
.collect::<Result<Vec<_>>>()?;
if let Some(sort_expr) = &on.sort_expr {
if let Some(query_ref) = query {
query_ref.order_by(self.sorts_to_sql(sort_expr)?);
} else {
return internal_err!(
"Sort operator only valid in a statement context."
);
}
}
select.projection(items);
(ast::Distinct::On(exprs), on.input.as_ref())
}
};
select.distinct(Some(select_distinct));
self.select_to_sql_recursively(input, query, select, relation)
}
LogicalPlan::Join(join) => {
let mut table_scan_filters = vec![];
let (left_plan, right_plan) = match join.join_type {
JoinType::RightSemi | JoinType::RightAnti => {
(&join.right, &join.left)
}
_ => (&join.left, &join.right),
};
// If there's an outer projection plan, it will already set up the projection.
// In that case, we don't need to worry about setting up the projection here.
// The outer projection plan will handle projecting the correct columns.
let already_projected = select.already_projected();
let left_plan =
match try_transform_to_simple_table_scan_with_filters(left_plan)? {
Some((plan, filters)) => {
table_scan_filters.extend(filters);
Arc::new(plan)
}
None => Arc::clone(left_plan),
};
self.select_to_sql_recursively(
left_plan.as_ref(),
query,
select,
relation,
)?;
let left_projection: Option<Vec<ast::SelectItem>> = if !already_projected
{
Some(select.pop_projections())
} else {
None
};
let right_plan =
match try_transform_to_simple_table_scan_with_filters(right_plan)? {
Some((plan, filters)) => {
table_scan_filters.extend(filters);
Arc::new(plan)
}
None => Arc::clone(right_plan),
};
let mut right_relation = RelationBuilder::default();
self.select_to_sql_recursively(
right_plan.as_ref(),
query,
select,
&mut right_relation,
)?;
let join_filters = if table_scan_filters.is_empty() {
join.filter.clone()
} else {
// Combine `table_scan_filters` into a single filter using `AND`
let Some(combined_filters) =
table_scan_filters.into_iter().reduce(|acc, filter| {
Expr::BinaryExpr(BinaryExpr {
left: Box::new(acc),
op: Operator::And,
right: Box::new(filter),
})
})
else {
return internal_err!("Failed to combine TableScan filters");
};
// Combine `join.filter` with `combined_filters` using `AND`
match &join.filter {
Some(filter) => Some(Expr::BinaryExpr(BinaryExpr {
left: Box::new(filter.clone()),
op: Operator::And,
right: Box::new(combined_filters),
})),
None => Some(combined_filters),
}
};
let join_constraint = self.join_constraint_to_sql(
join.join_constraint,
&join.on,
join_filters.as_ref(),
)?;
self.select_to_sql_recursively(
right_plan.as_ref(),
query,
select,
&mut right_relation,
)?;
let right_projection: Option<Vec<ast::SelectItem>> = if !already_projected
{
Some(select.pop_projections())
} else {
None
};
match join.join_type {
JoinType::LeftSemi
| JoinType::LeftAnti
| JoinType::LeftMark
| JoinType::RightSemi
| JoinType::RightAnti => {
let mut query_builder = QueryBuilder::default();
let mut from = TableWithJoinsBuilder::default();
let mut exists_select: SelectBuilder = SelectBuilder::default();
from.relation(right_relation);
exists_select.push_from(from);
if let Some(filter) = &join.filter {
exists_select.selection(Some(self.expr_to_sql(filter)?));
}
for (left, right) in &join.on {
exists_select.selection(Some(
self.expr_to_sql(&left.clone().eq(right.clone()))?,
));
}
exists_select.projection(vec![ast::SelectItem::UnnamedExpr(
ast::Expr::value(ast::Value::Number("1".to_string(), false)),
)]);
query_builder.body(Box::new(SetExpr::Select(Box::new(
exists_select.build()?,
))));
let negated = match join.join_type {
JoinType::LeftSemi
| JoinType::RightSemi
| JoinType::LeftMark => false,
JoinType::LeftAnti | JoinType::RightAnti => true,
_ => unreachable!(),
};
let exists_expr = ast::Expr::Exists {
subquery: Box::new(query_builder.build()?),
negated,
};
if join.join_type == JoinType::LeftMark {
let (table_ref, _) = right_plan.schema().qualified_field(0);
let column = self
.col_to_sql(&Column::new(table_ref.cloned(), "mark"))?;
select.replace_mark(&column, &exists_expr);
} else {
select.selection(Some(exists_expr));
}
if let Some(projection) = left_projection {
select.projection(projection);
}
}
JoinType::Inner
| JoinType::Left
| JoinType::Right
| JoinType::Full => {
let Ok(Some(relation)) = right_relation.build() else {
return internal_err!("Failed to build right relation");
};
let ast_join = ast::Join {
relation,
global: false,
join_operator: self
.join_operator_to_sql(join.join_type, join_constraint)?,
};
let mut from = select.pop_from().unwrap();
from.push_join(ast_join);
select.push_from(from);
if !already_projected {
let Some(left_projection) = left_projection else {
return internal_err!("Left projection is missing");
};
let Some(right_projection) = right_projection else {
return internal_err!("Right projection is missing");
};
let projection = left_projection
.into_iter()
.chain(right_projection.into_iter())
.collect();
select.projection(projection);
}
}
};
Ok(())
}
LogicalPlan::SubqueryAlias(plan_alias) => {
let (plan, mut columns) =
subquery_alias_inner_query_and_columns(plan_alias);
let unparsed_table_scan = Self::unparse_table_scan_pushdown(
plan,
Some(plan_alias.alias.clone()),
select.already_projected(),
)?;
// if the child plan is a TableScan with pushdown operations, we don't need to
// create an additional subquery for it
if !select.already_projected() && unparsed_table_scan.is_none() {
select.projection(vec![ast::SelectItem::Wildcard(
ast::WildcardAdditionalOptions::default(),
)]);
}
let plan = unparsed_table_scan.unwrap_or_else(|| plan.clone());
if !columns.is_empty()
&& !self.dialect.supports_column_alias_in_table_alias()
{
// Instead of specifying column aliases as part of the outer table, inject them directly into the inner projection
let rewritten_plan =
match inject_column_aliases_into_subquery(plan, columns) {
Ok(p) => p,
Err(e) => {
return internal_err!(
"Failed to transform SubqueryAlias plan: {e}"
)
}
};
columns = vec![];
self.select_to_sql_recursively(
&rewritten_plan,
query,
select,
relation,
)?;
} else {
self.select_to_sql_recursively(&plan, query, select, relation)?;
}
relation.alias(Some(
self.new_table_alias(plan_alias.alias.table().to_string(), columns),
));
Ok(())
}
LogicalPlan::Union(union) => {
// Covers cases where the UNION is a subquery and the projection is at the top level
if select.already_projected() {
return self.derive_with_dialect_alias(
"derived_union",
plan,
relation,
false,
vec![],
);
}
let input_exprs: Vec<SetExpr> = union
.inputs
.iter()
.map(|input| self.select_to_sql_expr(input, query))
.collect::<Result<Vec<_>>>()?;
if input_exprs.len() < 2 {
return internal_err!("UNION operator requires at least 2 inputs");
}
let set_quantifier =
if query.as_ref().is_some_and(|q| q.is_distinct_union()) {
// Setting the SetQuantifier to None will unparse as a `UNION`
// rather than a `UNION ALL`.
ast::SetQuantifier::None
} else {
ast::SetQuantifier::All
};
// Build the union expression tree bottom-up by reversing the order
// note that we are also swapping left and right inputs because of the rev
let union_expr = input_exprs
.into_iter()
.rev()
.reduce(|a, b| SetExpr::SetOperation {
op: ast::SetOperator::Union,
set_quantifier,
left: Box::new(b),
right: Box::new(a),
})
.unwrap();
let Some(query) = query.as_mut() else {
return internal_err!(
"UNION ALL operator only valid in a statement context"
);
};
query.body(Box::new(union_expr));
Ok(())
}
LogicalPlan::Window(window) => {
// Window nodes are handled simultaneously with Projection nodes
self.select_to_sql_recursively(
window.input.as_ref(),
query,
select,
relation,
)
}
LogicalPlan::EmptyRelation(_) => {
// An EmptyRelation could be behind an UNNEST node. If the dialect supports UNNEST as a table factor,
// a TableRelationBuilder will be created for the UNNEST node first.
if !relation.has_relation() {
relation.empty();
}
Ok(())
}
LogicalPlan::Extension(extension) => {
if let Some(query) = query.as_mut() {
self.extension_to_sql(
extension.node.as_ref(),
&mut Some(query),
&mut Some(select),
&mut Some(relation),
)
} else {
self.extension_to_sql(
extension.node.as_ref(),
&mut None,
&mut Some(select),
&mut Some(relation),
)
}
}
LogicalPlan::Unnest(unnest) => {
if !unnest.struct_type_columns.is_empty() {
return internal_err!(
"Struct type columns are not currently supported in UNNEST: {:?}",
unnest.struct_type_columns
);
}
// In the case of UNNEST, the Unnest node is followed by a duplicate Projection node that we should skip.
// Otherwise, there will be a duplicate SELECT clause.
// | Projection: table.col1, UNNEST(table.col2)
// | Unnest: UNNEST(table.col2)
// | Projection: table.col1, table.col2 AS UNNEST(table.col2)
// | Filter: table.col3 = Int64(3)
// | TableScan: table projection=None
if let LogicalPlan::Projection(p) = unnest.input.as_ref() {
// continue with projection input
self.select_to_sql_recursively(&p.input, query, select, relation)
} else {
internal_err!("Unnest input is not a Projection: {unnest:?}")
}
}
LogicalPlan::Subquery(subquery)
if find_unnest_node_until_relation(subquery.subquery.as_ref())
.is_some() =>
{
if self.dialect.unnest_as_table_factor() {
self.select_to_sql_recursively(
subquery.subquery.as_ref(),
query,
select,
relation,
)
} else {
self.derive_with_dialect_alias(
"derived_unnest",
subquery.subquery.as_ref(),
relation,
true,
vec![],
)
}
}
_ => {
not_impl_err!("Unsupported operator: {plan:?}")
}
}
}