fn select_to_sql_recursively()

in datafusion/sql/src/unparser/plan.rs [327:979]


    fn select_to_sql_recursively(
        &self,
        plan: &LogicalPlan,
        query: &mut Option<QueryBuilder>,
        select: &mut SelectBuilder,
        relation: &mut RelationBuilder,
    ) -> Result<()> {
        match plan {
            LogicalPlan::TableScan(scan) => {
                if let Some(unparsed_table_scan) = Self::unparse_table_scan_pushdown(
                    plan,
                    None,
                    select.already_projected(),
                )? {
                    return self.select_to_sql_recursively(
                        &unparsed_table_scan,
                        query,
                        select,
                        relation,
                    );
                }
                let mut builder = TableRelationBuilder::default();
                let mut table_parts = vec![];
                if let Some(catalog_name) = scan.table_name.catalog() {
                    table_parts
                        .push(self.new_ident_quoted_if_needs(catalog_name.to_string()));
                }
                if let Some(schema_name) = scan.table_name.schema() {
                    table_parts
                        .push(self.new_ident_quoted_if_needs(schema_name.to_string()));
                }
                table_parts.push(
                    self.new_ident_quoted_if_needs(scan.table_name.table().to_string()),
                );
                builder.name(ast::ObjectName::from(table_parts));
                relation.table(builder);

                Ok(())
            }
            LogicalPlan::Projection(p) => {
                if let Some(new_plan) = rewrite_plan_for_sort_on_non_projected_fields(p) {
                    return self
                        .select_to_sql_recursively(&new_plan, query, select, relation);
                }

                // Projection can be top-level plan for unnest relation
                // The projection generated by the `RecursiveUnnestRewriter` from a UNNEST relation will have
                // only one expression, which is the placeholder column generated by the rewriter.
                let unnest_input_type = if p.expr.len() == 1 {
                    Self::check_unnest_placeholder_with_outer_ref(&p.expr[0])
                } else {
                    None
                };
                if self.dialect.unnest_as_table_factor() && unnest_input_type.is_some() {
                    if let LogicalPlan::Unnest(unnest) = &p.input.as_ref() {
                        if let Some(unnest_relation) =
                            self.try_unnest_to_table_factor_sql(unnest)?
                        {
                            relation.unnest(unnest_relation);
                            return self.select_to_sql_recursively(
                                p.input.as_ref(),
                                query,
                                select,
                                relation,
                            );
                        }
                    }
                }

                // If it's a unnest projection, we should provide the table column alias
                // to provide a column name for the unnest relation.
                let columns = if unnest_input_type.is_some() {
                    p.expr
                        .iter()
                        .map(|e| {
                            self.new_ident_quoted_if_needs(e.schema_name().to_string())
                        })
                        .collect()
                } else {
                    vec![]
                };
                // Projection can be top-level plan for derived table
                if select.already_projected() {
                    return self.derive_with_dialect_alias(
                        "derived_projection",
                        plan,
                        relation,
                        unnest_input_type
                            .filter(|t| matches!(t, UnnestInputType::OuterReference))
                            .is_some(),
                        columns,
                    );
                }
                self.reconstruct_select_statement(plan, p, select)?;
                self.select_to_sql_recursively(p.input.as_ref(), query, select, relation)
            }
            LogicalPlan::Filter(filter) => {
                if let Some(agg) =
                    find_agg_node_within_select(plan, select.already_projected())
                {
                    let unprojected =
                        unproject_agg_exprs(filter.predicate.clone(), agg, None)?;
                    let filter_expr = self.expr_to_sql(&unprojected)?;
                    select.having(Some(filter_expr));
                } else {
                    let filter_expr = self.expr_to_sql(&filter.predicate)?;
                    select.selection(Some(filter_expr));
                }

                self.select_to_sql_recursively(
                    filter.input.as_ref(),
                    query,
                    select,
                    relation,
                )
            }
            LogicalPlan::Limit(limit) => {
                // Limit can be top-level plan for derived table
                if select.already_projected() {
                    return self.derive_with_dialect_alias(
                        "derived_limit",
                        plan,
                        relation,
                        false,
                        vec![],
                    );
                }
                if let Some(fetch) = &limit.fetch {
                    let Some(query) = query.as_mut() else {
                        return internal_err!(
                            "Limit operator only valid in a statement context."
                        );
                    };
                    query.limit(Some(self.expr_to_sql(fetch)?));
                }

                if let Some(skip) = &limit.skip {
                    let Some(query) = query.as_mut() else {
                        return internal_err!(
                            "Offset operator only valid in a statement context."
                        );
                    };
                    query.offset(Some(ast::Offset {
                        rows: ast::OffsetRows::None,
                        value: self.expr_to_sql(skip)?,
                    }));
                }

                self.select_to_sql_recursively(
                    limit.input.as_ref(),
                    query,
                    select,
                    relation,
                )
            }
            LogicalPlan::Sort(sort) => {
                // Sort can be top-level plan for derived table
                if select.already_projected() {
                    return self.derive_with_dialect_alias(
                        "derived_sort",
                        plan,
                        relation,
                        false,
                        vec![],
                    );
                }
                let Some(query_ref) = query else {
                    return internal_err!(
                        "Sort operator only valid in a statement context."
                    );
                };

                if let Some(fetch) = sort.fetch {
                    query_ref.limit(Some(ast::Expr::value(ast::Value::Number(
                        fetch.to_string(),
                        false,
                    ))));
                };

                let agg = find_agg_node_within_select(plan, select.already_projected());
                // unproject sort expressions
                let sort_exprs: Vec<SortExpr> = sort
                    .expr
                    .iter()
                    .map(|sort_expr| {
                        unproject_sort_expr(sort_expr, agg, sort.input.as_ref())
                    })
                    .collect::<Result<Vec<_>>>()?;

                query_ref.order_by(self.sorts_to_sql(&sort_exprs)?);

                self.select_to_sql_recursively(
                    sort.input.as_ref(),
                    query,
                    select,
                    relation,
                )
            }
            LogicalPlan::Aggregate(agg) => {
                // Aggregation can be already handled in the projection case
                if !select.already_projected() {
                    // The query returns aggregate and group expressions. If that weren't the case,
                    // the aggregate would have been placed inside a projection, making the check above^ false
                    let exprs: Vec<_> = agg
                        .aggr_expr
                        .iter()
                        .chain(agg.group_expr.iter())
                        .map(|expr| self.select_item_to_sql(expr))
                        .collect::<Result<Vec<_>>>()?;
                    select.projection(exprs);

                    select.group_by(ast::GroupByExpr::Expressions(
                        agg.group_expr
                            .iter()
                            .map(|expr| self.expr_to_sql(expr))
                            .collect::<Result<Vec<_>>>()?,
                        vec![],
                    ));
                }

                self.select_to_sql_recursively(
                    agg.input.as_ref(),
                    query,
                    select,
                    relation,
                )
            }
            LogicalPlan::Distinct(distinct) => {
                // Distinct can be top-level plan for derived table
                if select.already_projected() {
                    return self.derive_with_dialect_alias(
                        "derived_distinct",
                        plan,
                        relation,
                        false,
                        vec![],
                    );
                }

                // If this distinct is the parent of a Union and we're in a query context,
                // then we need to unparse as a `UNION` rather than a `UNION ALL`.
                if let Distinct::All(input) = distinct {
                    if matches!(input.as_ref(), LogicalPlan::Union(_)) {
                        if let Some(query_mut) = query.as_mut() {
                            query_mut.distinct_union();
                            return self.select_to_sql_recursively(
                                input.as_ref(),
                                query,
                                select,
                                relation,
                            );
                        }
                    }
                }

                let (select_distinct, input) = match distinct {
                    Distinct::All(input) => (ast::Distinct::Distinct, input.as_ref()),
                    Distinct::On(on) => {
                        let exprs = on
                            .on_expr
                            .iter()
                            .map(|e| self.expr_to_sql(e))
                            .collect::<Result<Vec<_>>>()?;
                        let items = on
                            .select_expr
                            .iter()
                            .map(|e| self.select_item_to_sql(e))
                            .collect::<Result<Vec<_>>>()?;
                        if let Some(sort_expr) = &on.sort_expr {
                            if let Some(query_ref) = query {
                                query_ref.order_by(self.sorts_to_sql(sort_expr)?);
                            } else {
                                return internal_err!(
                                    "Sort operator only valid in a statement context."
                                );
                            }
                        }
                        select.projection(items);
                        (ast::Distinct::On(exprs), on.input.as_ref())
                    }
                };
                select.distinct(Some(select_distinct));
                self.select_to_sql_recursively(input, query, select, relation)
            }
            LogicalPlan::Join(join) => {
                let mut table_scan_filters = vec![];
                let (left_plan, right_plan) = match join.join_type {
                    JoinType::RightSemi | JoinType::RightAnti => {
                        (&join.right, &join.left)
                    }
                    _ => (&join.left, &join.right),
                };
                // If there's an outer projection plan, it will already set up the projection.
                // In that case, we don't need to worry about setting up the projection here.
                // The outer projection plan will handle projecting the correct columns.
                let already_projected = select.already_projected();

                let left_plan =
                    match try_transform_to_simple_table_scan_with_filters(left_plan)? {
                        Some((plan, filters)) => {
                            table_scan_filters.extend(filters);
                            Arc::new(plan)
                        }
                        None => Arc::clone(left_plan),
                    };

                self.select_to_sql_recursively(
                    left_plan.as_ref(),
                    query,
                    select,
                    relation,
                )?;

                let left_projection: Option<Vec<ast::SelectItem>> = if !already_projected
                {
                    Some(select.pop_projections())
                } else {
                    None
                };

                let right_plan =
                    match try_transform_to_simple_table_scan_with_filters(right_plan)? {
                        Some((plan, filters)) => {
                            table_scan_filters.extend(filters);
                            Arc::new(plan)
                        }
                        None => Arc::clone(right_plan),
                    };

                let mut right_relation = RelationBuilder::default();

                self.select_to_sql_recursively(
                    right_plan.as_ref(),
                    query,
                    select,
                    &mut right_relation,
                )?;

                let join_filters = if table_scan_filters.is_empty() {
                    join.filter.clone()
                } else {
                    // Combine `table_scan_filters` into a single filter using `AND`
                    let Some(combined_filters) =
                        table_scan_filters.into_iter().reduce(|acc, filter| {
                            Expr::BinaryExpr(BinaryExpr {
                                left: Box::new(acc),
                                op: Operator::And,
                                right: Box::new(filter),
                            })
                        })
                    else {
                        return internal_err!("Failed to combine TableScan filters");
                    };

                    // Combine `join.filter` with `combined_filters` using `AND`
                    match &join.filter {
                        Some(filter) => Some(Expr::BinaryExpr(BinaryExpr {
                            left: Box::new(filter.clone()),
                            op: Operator::And,
                            right: Box::new(combined_filters),
                        })),
                        None => Some(combined_filters),
                    }
                };

                let join_constraint = self.join_constraint_to_sql(
                    join.join_constraint,
                    &join.on,
                    join_filters.as_ref(),
                )?;

                self.select_to_sql_recursively(
                    right_plan.as_ref(),
                    query,
                    select,
                    &mut right_relation,
                )?;

                let right_projection: Option<Vec<ast::SelectItem>> = if !already_projected
                {
                    Some(select.pop_projections())
                } else {
                    None
                };

                match join.join_type {
                    JoinType::LeftSemi
                    | JoinType::LeftAnti
                    | JoinType::LeftMark
                    | JoinType::RightSemi
                    | JoinType::RightAnti => {
                        let mut query_builder = QueryBuilder::default();
                        let mut from = TableWithJoinsBuilder::default();
                        let mut exists_select: SelectBuilder = SelectBuilder::default();
                        from.relation(right_relation);
                        exists_select.push_from(from);
                        if let Some(filter) = &join.filter {
                            exists_select.selection(Some(self.expr_to_sql(filter)?));
                        }
                        for (left, right) in &join.on {
                            exists_select.selection(Some(
                                self.expr_to_sql(&left.clone().eq(right.clone()))?,
                            ));
                        }
                        exists_select.projection(vec![ast::SelectItem::UnnamedExpr(
                            ast::Expr::value(ast::Value::Number("1".to_string(), false)),
                        )]);
                        query_builder.body(Box::new(SetExpr::Select(Box::new(
                            exists_select.build()?,
                        ))));

                        let negated = match join.join_type {
                            JoinType::LeftSemi
                            | JoinType::RightSemi
                            | JoinType::LeftMark => false,
                            JoinType::LeftAnti | JoinType::RightAnti => true,
                            _ => unreachable!(),
                        };
                        let exists_expr = ast::Expr::Exists {
                            subquery: Box::new(query_builder.build()?),
                            negated,
                        };
                        if join.join_type == JoinType::LeftMark {
                            let (table_ref, _) = right_plan.schema().qualified_field(0);
                            let column = self
                                .col_to_sql(&Column::new(table_ref.cloned(), "mark"))?;
                            select.replace_mark(&column, &exists_expr);
                        } else {
                            select.selection(Some(exists_expr));
                        }
                        if let Some(projection) = left_projection {
                            select.projection(projection);
                        }
                    }
                    JoinType::Inner
                    | JoinType::Left
                    | JoinType::Right
                    | JoinType::Full => {
                        let Ok(Some(relation)) = right_relation.build() else {
                            return internal_err!("Failed to build right relation");
                        };
                        let ast_join = ast::Join {
                            relation,
                            global: false,
                            join_operator: self
                                .join_operator_to_sql(join.join_type, join_constraint)?,
                        };
                        let mut from = select.pop_from().unwrap();
                        from.push_join(ast_join);
                        select.push_from(from);
                        if !already_projected {
                            let Some(left_projection) = left_projection else {
                                return internal_err!("Left projection is missing");
                            };

                            let Some(right_projection) = right_projection else {
                                return internal_err!("Right projection is missing");
                            };

                            let projection = left_projection
                                .into_iter()
                                .chain(right_projection.into_iter())
                                .collect();
                            select.projection(projection);
                        }
                    }
                };

                Ok(())
            }
            LogicalPlan::SubqueryAlias(plan_alias) => {
                let (plan, mut columns) =
                    subquery_alias_inner_query_and_columns(plan_alias);
                let unparsed_table_scan = Self::unparse_table_scan_pushdown(
                    plan,
                    Some(plan_alias.alias.clone()),
                    select.already_projected(),
                )?;
                // if the child plan is a TableScan with pushdown operations, we don't need to
                // create an additional subquery for it
                if !select.already_projected() && unparsed_table_scan.is_none() {
                    select.projection(vec![ast::SelectItem::Wildcard(
                        ast::WildcardAdditionalOptions::default(),
                    )]);
                }
                let plan = unparsed_table_scan.unwrap_or_else(|| plan.clone());
                if !columns.is_empty()
                    && !self.dialect.supports_column_alias_in_table_alias()
                {
                    // Instead of specifying column aliases as part of the outer table, inject them directly into the inner projection
                    let rewritten_plan =
                        match inject_column_aliases_into_subquery(plan, columns) {
                            Ok(p) => p,
                            Err(e) => {
                                return internal_err!(
                                    "Failed to transform SubqueryAlias plan: {e}"
                                )
                            }
                        };

                    columns = vec![];

                    self.select_to_sql_recursively(
                        &rewritten_plan,
                        query,
                        select,
                        relation,
                    )?;
                } else {
                    self.select_to_sql_recursively(&plan, query, select, relation)?;
                }

                relation.alias(Some(
                    self.new_table_alias(plan_alias.alias.table().to_string(), columns),
                ));

                Ok(())
            }
            LogicalPlan::Union(union) => {
                // Covers cases where the UNION is a subquery and the projection is at the top level
                if select.already_projected() {
                    return self.derive_with_dialect_alias(
                        "derived_union",
                        plan,
                        relation,
                        false,
                        vec![],
                    );
                }

                let input_exprs: Vec<SetExpr> = union
                    .inputs
                    .iter()
                    .map(|input| self.select_to_sql_expr(input, query))
                    .collect::<Result<Vec<_>>>()?;

                if input_exprs.len() < 2 {
                    return internal_err!("UNION operator requires at least 2 inputs");
                }

                let set_quantifier =
                    if query.as_ref().is_some_and(|q| q.is_distinct_union()) {
                        // Setting the SetQuantifier to None will unparse as a `UNION`
                        // rather than a `UNION ALL`.
                        ast::SetQuantifier::None
                    } else {
                        ast::SetQuantifier::All
                    };

                // Build the union expression tree bottom-up by reversing the order
                // note that we are also swapping left and right inputs because of the rev
                let union_expr = input_exprs
                    .into_iter()
                    .rev()
                    .reduce(|a, b| SetExpr::SetOperation {
                        op: ast::SetOperator::Union,
                        set_quantifier,
                        left: Box::new(b),
                        right: Box::new(a),
                    })
                    .unwrap();

                let Some(query) = query.as_mut() else {
                    return internal_err!(
                        "UNION ALL operator only valid in a statement context"
                    );
                };
                query.body(Box::new(union_expr));

                Ok(())
            }
            LogicalPlan::Window(window) => {
                // Window nodes are handled simultaneously with Projection nodes
                self.select_to_sql_recursively(
                    window.input.as_ref(),
                    query,
                    select,
                    relation,
                )
            }
            LogicalPlan::EmptyRelation(_) => {
                // An EmptyRelation could be behind an UNNEST node. If the dialect supports UNNEST as a table factor,
                // a TableRelationBuilder will be created for the UNNEST node first.
                if !relation.has_relation() {
                    relation.empty();
                }
                Ok(())
            }
            LogicalPlan::Extension(extension) => {
                if let Some(query) = query.as_mut() {
                    self.extension_to_sql(
                        extension.node.as_ref(),
                        &mut Some(query),
                        &mut Some(select),
                        &mut Some(relation),
                    )
                } else {
                    self.extension_to_sql(
                        extension.node.as_ref(),
                        &mut None,
                        &mut Some(select),
                        &mut Some(relation),
                    )
                }
            }
            LogicalPlan::Unnest(unnest) => {
                if !unnest.struct_type_columns.is_empty() {
                    return internal_err!(
                        "Struct type columns are not currently supported in UNNEST: {:?}",
                        unnest.struct_type_columns
                    );
                }

                // In the case of UNNEST, the Unnest node is followed by a duplicate Projection node that we should skip.
                // Otherwise, there will be a duplicate SELECT clause.
                // | Projection: table.col1, UNNEST(table.col2)
                // |   Unnest: UNNEST(table.col2)
                // |     Projection: table.col1, table.col2 AS UNNEST(table.col2)
                // |       Filter: table.col3 = Int64(3)
                // |         TableScan: table projection=None
                if let LogicalPlan::Projection(p) = unnest.input.as_ref() {
                    // continue with projection input
                    self.select_to_sql_recursively(&p.input, query, select, relation)
                } else {
                    internal_err!("Unnest input is not a Projection: {unnest:?}")
                }
            }
            LogicalPlan::Subquery(subquery)
                if find_unnest_node_until_relation(subquery.subquery.as_ref())
                    .is_some() =>
            {
                if self.dialect.unnest_as_table_factor() {
                    self.select_to_sql_recursively(
                        subquery.subquery.as_ref(),
                        query,
                        select,
                        relation,
                    )
                } else {
                    self.derive_with_dialect_alias(
                        "derived_unnest",
                        subquery.subquery.as_ref(),
                        relation,
                        true,
                        vec![],
                    )
                }
            }
            _ => {
                not_impl_err!("Unsupported operator: {plan:?}")
            }
        }
    }