in datafusion/sql/src/unparser/plan.rs [1041:1195]
fn unparse_table_scan_pushdown(
plan: &LogicalPlan,
alias: Option<TableReference>,
already_projected: bool,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::TableScan(table_scan) => {
if !Self::is_scan_with_pushdown(table_scan) {
return Ok(None);
}
let table_schema = table_scan.source.schema();
let mut filter_alias_rewriter =
alias.as_ref().map(|alias_name| TableAliasRewriter {
table_schema: &table_schema,
alias_name: alias_name.clone(),
});
let mut builder = LogicalPlanBuilder::scan(
table_scan.table_name.clone(),
Arc::clone(&table_scan.source),
None,
)?;
// We will rebase the column references to the new alias if it exists.
// If the projection or filters are empty, we will append alias to the table scan.
//
// Example:
// select t1.c1 from t1 where t1.c1 > 1 -> select a.c1 from t1 as a where a.c1 > 1
if let Some(ref alias) = alias {
if table_scan.projection.is_some() || !table_scan.filters.is_empty() {
builder = builder.alias(alias.clone())?;
}
}
// Avoid creating a duplicate Projection node, which would result in an additional subquery if a projection already exists.
// For example, if the `optimize_projection` rule is applied, there will be a Projection node, and duplicate projection
// information included in the TableScan node.
if !already_projected {
if let Some(project_vec) = &table_scan.projection {
if project_vec.is_empty() {
builder = builder.project(vec![Expr::Literal(
ScalarValue::Int64(Some(1)),
)])?;
} else {
let project_columns = project_vec
.iter()
.cloned()
.map(|i| {
let schema = table_scan.source.schema();
let field = schema.field(i);
if alias.is_some() {
Column::new(alias.clone(), field.name().clone())
} else {
Column::new(
Some(table_scan.table_name.clone()),
field.name().clone(),
)
}
})
.collect::<Vec<_>>();
builder = builder.project(project_columns)?;
};
}
}
let filter_expr: Result<Option<Expr>> = table_scan
.filters
.iter()
.cloned()
.map(|expr| {
if let Some(ref mut rewriter) = filter_alias_rewriter {
expr.rewrite(rewriter).data()
} else {
Ok(expr)
}
})
.reduce(|acc, expr_result| {
acc.and_then(|acc_expr| {
expr_result.map(|expr| acc_expr.and(expr))
})
})
.transpose();
if let Some(filter) = filter_expr? {
builder = builder.filter(filter)?;
}
if let Some(fetch) = table_scan.fetch {
builder = builder.limit(0, Some(fetch))?;
}
// If the table scan has an alias but no projection or filters, it means no column references are rebased.
// So we will append the alias to this subquery.
// Example:
// select * from t1 limit 10 -> (select * from t1 limit 10) as a
if let Some(alias) = alias {
if table_scan.projection.is_none() && table_scan.filters.is_empty() {
builder = builder.alias(alias)?;
}
}
Ok(Some(builder.build()?))
}
LogicalPlan::SubqueryAlias(subquery_alias) => {
let ret = Self::unparse_table_scan_pushdown(
&subquery_alias.input,
Some(subquery_alias.alias.clone()),
already_projected,
)?;
if let Some(alias) = alias {
if let Some(plan) = ret {
let plan = LogicalPlanBuilder::new(plan).alias(alias)?.build()?;
return Ok(Some(plan));
}
}
Ok(ret)
}
// SubqueryAlias could be rewritten to a plan with a projection as the top node by [rewrite::subquery_alias_inner_query_and_columns].
// The inner table scan could be a scan with pushdown operations.
LogicalPlan::Projection(projection) => {
if let Some(plan) = Self::unparse_table_scan_pushdown(
&projection.input,
alias.clone(),
already_projected,
)? {
let exprs = if alias.is_some() {
let mut alias_rewriter =
alias.as_ref().map(|alias_name| TableAliasRewriter {
table_schema: plan.schema().as_arrow(),
alias_name: alias_name.clone(),
});
projection
.expr
.iter()
.cloned()
.map(|expr| {
if let Some(ref mut rewriter) = alias_rewriter {
expr.rewrite(rewriter).data()
} else {
Ok(expr)
}
})
.collect::<Result<Vec<_>>>()?
} else {
projection.expr.clone()
};
Ok(Some(
LogicalPlanBuilder::from(plan).project(exprs)?.build()?,
))
} else {
Ok(None)
}
}
_ => Ok(None),
}
}