fn unparse_table_scan_pushdown()

in datafusion/sql/src/unparser/plan.rs [1041:1195]


    fn unparse_table_scan_pushdown(
        plan: &LogicalPlan,
        alias: Option<TableReference>,
        already_projected: bool,
    ) -> Result<Option<LogicalPlan>> {
        match plan {
            LogicalPlan::TableScan(table_scan) => {
                if !Self::is_scan_with_pushdown(table_scan) {
                    return Ok(None);
                }
                let table_schema = table_scan.source.schema();
                let mut filter_alias_rewriter =
                    alias.as_ref().map(|alias_name| TableAliasRewriter {
                        table_schema: &table_schema,
                        alias_name: alias_name.clone(),
                    });

                let mut builder = LogicalPlanBuilder::scan(
                    table_scan.table_name.clone(),
                    Arc::clone(&table_scan.source),
                    None,
                )?;
                // We will rebase the column references to the new alias if it exists.
                // If the projection or filters are empty, we will append alias to the table scan.
                //
                // Example:
                //   select t1.c1 from t1 where t1.c1 > 1 -> select a.c1 from t1 as a where a.c1 > 1
                if let Some(ref alias) = alias {
                    if table_scan.projection.is_some() || !table_scan.filters.is_empty() {
                        builder = builder.alias(alias.clone())?;
                    }
                }

                // Avoid creating a duplicate Projection node, which would result in an additional subquery if a projection already exists.
                // For example, if the `optimize_projection` rule is applied, there will be a Projection node, and duplicate projection
                // information included in the TableScan node.
                if !already_projected {
                    if let Some(project_vec) = &table_scan.projection {
                        if project_vec.is_empty() {
                            builder = builder.project(vec![Expr::Literal(
                                ScalarValue::Int64(Some(1)),
                            )])?;
                        } else {
                            let project_columns = project_vec
                                .iter()
                                .cloned()
                                .map(|i| {
                                    let schema = table_scan.source.schema();
                                    let field = schema.field(i);
                                    if alias.is_some() {
                                        Column::new(alias.clone(), field.name().clone())
                                    } else {
                                        Column::new(
                                            Some(table_scan.table_name.clone()),
                                            field.name().clone(),
                                        )
                                    }
                                })
                                .collect::<Vec<_>>();
                            builder = builder.project(project_columns)?;
                        };
                    }
                }

                let filter_expr: Result<Option<Expr>> = table_scan
                    .filters
                    .iter()
                    .cloned()
                    .map(|expr| {
                        if let Some(ref mut rewriter) = filter_alias_rewriter {
                            expr.rewrite(rewriter).data()
                        } else {
                            Ok(expr)
                        }
                    })
                    .reduce(|acc, expr_result| {
                        acc.and_then(|acc_expr| {
                            expr_result.map(|expr| acc_expr.and(expr))
                        })
                    })
                    .transpose();

                if let Some(filter) = filter_expr? {
                    builder = builder.filter(filter)?;
                }

                if let Some(fetch) = table_scan.fetch {
                    builder = builder.limit(0, Some(fetch))?;
                }

                // If the table scan has an alias but no projection or filters, it means no column references are rebased.
                // So we will append the alias to this subquery.
                // Example:
                //   select * from t1 limit 10 -> (select * from t1 limit 10) as a
                if let Some(alias) = alias {
                    if table_scan.projection.is_none() && table_scan.filters.is_empty() {
                        builder = builder.alias(alias)?;
                    }
                }

                Ok(Some(builder.build()?))
            }
            LogicalPlan::SubqueryAlias(subquery_alias) => {
                let ret = Self::unparse_table_scan_pushdown(
                    &subquery_alias.input,
                    Some(subquery_alias.alias.clone()),
                    already_projected,
                )?;
                if let Some(alias) = alias {
                    if let Some(plan) = ret {
                        let plan = LogicalPlanBuilder::new(plan).alias(alias)?.build()?;
                        return Ok(Some(plan));
                    }
                }
                Ok(ret)
            }
            // SubqueryAlias could be rewritten to a plan with a projection as the top node by [rewrite::subquery_alias_inner_query_and_columns].
            // The inner table scan could be a scan with pushdown operations.
            LogicalPlan::Projection(projection) => {
                if let Some(plan) = Self::unparse_table_scan_pushdown(
                    &projection.input,
                    alias.clone(),
                    already_projected,
                )? {
                    let exprs = if alias.is_some() {
                        let mut alias_rewriter =
                            alias.as_ref().map(|alias_name| TableAliasRewriter {
                                table_schema: plan.schema().as_arrow(),
                                alias_name: alias_name.clone(),
                            });
                        projection
                            .expr
                            .iter()
                            .cloned()
                            .map(|expr| {
                                if let Some(ref mut rewriter) = alias_rewriter {
                                    expr.rewrite(rewriter).data()
                                } else {
                                    Ok(expr)
                                }
                            })
                            .collect::<Result<Vec<_>>>()?
                    } else {
                        projection.expr.clone()
                    };
                    Ok(Some(
                        LogicalPlanBuilder::from(plan).project(exprs)?.build()?,
                    ))
                } else {
                    Ok(None)
                }
            }
            _ => Ok(None),
        }
    }