in datafusion/optimizer/src/scalar_subquery_to_join.rs [64:186]
fn try_optimize(
&self,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
let (subqueries, mut rewrite_expr) = self.extract_subquery_exprs(
&filter.predicate,
config.alias_generator(),
)?;
if subqueries.is_empty() {
// regular filter, no subquery exists clause here
return Ok(None);
}
// iterate through all subqueries in predicate, turning each into a left join
let mut cur_input = filter.input.as_ref().clone();
for (subquery, alias) in subqueries {
if let Some((optimized_subquery, expr_check_map)) =
build_join(&subquery, &cur_input, &alias)?
{
if !expr_check_map.is_empty() {
rewrite_expr =
rewrite_expr.clone().transform_up(&|expr| {
if let Expr::Column(col) = &expr {
if let Some(map_expr) =
expr_check_map.get(&col.name)
{
Ok(Transformed::Yes(map_expr.clone()))
} else {
Ok(Transformed::No(expr))
}
} else {
Ok(Transformed::No(expr))
}
})?;
}
cur_input = optimized_subquery;
} else {
// if we can't handle all of the subqueries then bail for now
return Ok(None);
}
}
let new_plan = LogicalPlanBuilder::from(cur_input)
.filter(rewrite_expr)?
.build()?;
Ok(Some(new_plan))
}
LogicalPlan::Projection(projection) => {
let mut all_subqueryies = vec![];
let mut expr_to_rewrite_expr_map = HashMap::new();
let mut subquery_to_expr_map = HashMap::new();
for expr in projection.expr.iter() {
let (subqueries, rewrite_exprs) =
self.extract_subquery_exprs(expr, config.alias_generator())?;
for (subquery, _) in &subqueries {
subquery_to_expr_map.insert(subquery.clone(), expr.clone());
}
all_subqueryies.extend(subqueries);
expr_to_rewrite_expr_map.insert(expr, rewrite_exprs);
}
if all_subqueryies.is_empty() {
// regular projection, no subquery exists clause here
return Ok(None);
}
// iterate through all subqueries in predicate, turning each into a left join
let mut cur_input = projection.input.as_ref().clone();
for (subquery, alias) in all_subqueryies {
if let Some((optimized_subquery, expr_check_map)) =
build_join(&subquery, &cur_input, &alias)?
{
cur_input = optimized_subquery;
if !expr_check_map.is_empty() {
if let Some(expr) = subquery_to_expr_map.get(&subquery) {
if let Some(rewrite_expr) =
expr_to_rewrite_expr_map.get(expr)
{
let new_expr =
rewrite_expr.clone().transform_up(&|expr| {
if let Expr::Column(col) = &expr {
if let Some(map_expr) =
expr_check_map.get(&col.name)
{
Ok(Transformed::Yes(map_expr.clone()))
} else {
Ok(Transformed::No(expr))
}
} else {
Ok(Transformed::No(expr))
}
})?;
expr_to_rewrite_expr_map.insert(expr, new_expr);
}
}
}
} else {
// if we can't handle all of the subqueries then bail for now
return Ok(None);
}
}
let mut proj_exprs = vec![];
for expr in projection.expr.iter() {
let old_expr_name = expr.display_name()?;
let new_expr = expr_to_rewrite_expr_map.get(expr).unwrap();
let new_expr_name = new_expr.display_name()?;
if new_expr_name != old_expr_name {
proj_exprs.push(new_expr.clone().alias(old_expr_name))
} else {
proj_exprs.push(new_expr.clone());
}
}
let new_plan = LogicalPlanBuilder::from(cur_input)
.project(proj_exprs)?
.build()?;
Ok(Some(new_plan))
}
_ => Ok(None),
}
}