in datafusion/optimizer/src/scalar_subquery_to_join.rs [77:209]
fn rewrite(
&self,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
// Optimization: skip the rest of the rule and its copies if
// there are no scalar subqueries
if !contains_scalar_subquery(&filter.predicate) {
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
}
let (subqueries, mut rewrite_expr) = self.extract_subquery_exprs(
&filter.predicate,
config.alias_generator(),
)?;
if subqueries.is_empty() {
return internal_err!("Expected subqueries not found in filter");
}
// iterate through all subqueries in predicate, turning each into a left join
let mut cur_input = filter.input.as_ref().clone();
for (subquery, alias) in subqueries {
if let Some((optimized_subquery, expr_check_map)) =
build_join(&subquery, &cur_input, &alias)?
{
if !expr_check_map.is_empty() {
rewrite_expr = rewrite_expr
.transform_up(|expr| {
// replace column references with entry in map, if it exists
if let Some(map_expr) = expr
.try_as_col()
.and_then(|col| expr_check_map.get(&col.name))
{
Ok(Transformed::yes(map_expr.clone()))
} else {
Ok(Transformed::no(expr))
}
})
.data()?;
}
cur_input = optimized_subquery;
} else {
// if we can't handle all of the subqueries then bail for now
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
}
}
let new_plan = LogicalPlanBuilder::from(cur_input)
.filter(rewrite_expr)?
.build()?;
Ok(Transformed::yes(new_plan))
}
LogicalPlan::Projection(projection) => {
// Optimization: skip the rest of the rule and its copies if
// there are no scalar subqueries
if !projection.expr.iter().any(contains_scalar_subquery) {
return Ok(Transformed::no(LogicalPlan::Projection(projection)));
}
let mut all_subqueries = vec![];
let mut expr_to_rewrite_expr_map = HashMap::new();
let mut subquery_to_expr_map = HashMap::new();
for expr in projection.expr.iter() {
let (subqueries, rewrite_exprs) =
self.extract_subquery_exprs(expr, config.alias_generator())?;
for (subquery, _) in &subqueries {
subquery_to_expr_map.insert(subquery.clone(), expr.clone());
}
all_subqueries.extend(subqueries);
expr_to_rewrite_expr_map.insert(expr, rewrite_exprs);
}
if all_subqueries.is_empty() {
return internal_err!("Expected subqueries not found in projection");
}
// iterate through all subqueries in predicate, turning each into a left join
let mut cur_input = projection.input.as_ref().clone();
for (subquery, alias) in all_subqueries {
if let Some((optimized_subquery, expr_check_map)) =
build_join(&subquery, &cur_input, &alias)?
{
cur_input = optimized_subquery;
if !expr_check_map.is_empty() {
if let Some(expr) = subquery_to_expr_map.get(&subquery) {
if let Some(rewrite_expr) =
expr_to_rewrite_expr_map.get(expr)
{
let new_expr = rewrite_expr
.clone()
.transform_up(|expr| {
// replace column references with entry in map, if it exists
if let Some(map_expr) =
expr.try_as_col().and_then(|col| {
expr_check_map.get(&col.name)
})
{
Ok(Transformed::yes(map_expr.clone()))
} else {
Ok(Transformed::no(expr))
}
})
.data()?;
expr_to_rewrite_expr_map.insert(expr, new_expr);
}
}
}
} else {
// if we can't handle all of the subqueries then bail for now
return Ok(Transformed::no(LogicalPlan::Projection(projection)));
}
}
let mut proj_exprs = vec![];
for expr in projection.expr.iter() {
let old_expr_name = expr.schema_name().to_string();
let new_expr = expr_to_rewrite_expr_map.get(expr).unwrap();
let new_expr_name = new_expr.schema_name().to_string();
if new_expr_name != old_expr_name {
proj_exprs.push(new_expr.clone().alias(old_expr_name))
} else {
proj_exprs.push(new_expr.clone());
}
}
let new_plan = LogicalPlanBuilder::from(cur_input)
.project(proj_exprs)?
.build()?;
Ok(Transformed::yes(new_plan))
}
plan => Ok(Transformed::no(plan)),
}
}