fn try_optimize()

in datafusion/optimizer/src/scalar_subquery_to_join.rs [64:186]


    fn try_optimize(
        &self,
        plan: &LogicalPlan,
        config: &dyn OptimizerConfig,
    ) -> Result<Option<LogicalPlan>> {
        match plan {
            LogicalPlan::Filter(filter) => {
                let (subqueries, mut rewrite_expr) = self.extract_subquery_exprs(
                    &filter.predicate,
                    config.alias_generator(),
                )?;

                if subqueries.is_empty() {
                    // regular filter, no subquery exists clause here
                    return Ok(None);
                }

                // iterate through all subqueries in predicate, turning each into a left join
                let mut cur_input = filter.input.as_ref().clone();
                for (subquery, alias) in subqueries {
                    if let Some((optimized_subquery, expr_check_map)) =
                        build_join(&subquery, &cur_input, &alias)?
                    {
                        if !expr_check_map.is_empty() {
                            rewrite_expr =
                                rewrite_expr.clone().transform_up(&|expr| {
                                    if let Expr::Column(col) = &expr {
                                        if let Some(map_expr) =
                                            expr_check_map.get(&col.name)
                                        {
                                            Ok(Transformed::Yes(map_expr.clone()))
                                        } else {
                                            Ok(Transformed::No(expr))
                                        }
                                    } else {
                                        Ok(Transformed::No(expr))
                                    }
                                })?;
                        }
                        cur_input = optimized_subquery;
                    } else {
                        // if we can't handle all of the subqueries then bail for now
                        return Ok(None);
                    }
                }
                let new_plan = LogicalPlanBuilder::from(cur_input)
                    .filter(rewrite_expr)?
                    .build()?;
                Ok(Some(new_plan))
            }
            LogicalPlan::Projection(projection) => {
                let mut all_subqueryies = vec![];
                let mut expr_to_rewrite_expr_map = HashMap::new();
                let mut subquery_to_expr_map = HashMap::new();
                for expr in projection.expr.iter() {
                    let (subqueries, rewrite_exprs) =
                        self.extract_subquery_exprs(expr, config.alias_generator())?;
                    for (subquery, _) in &subqueries {
                        subquery_to_expr_map.insert(subquery.clone(), expr.clone());
                    }
                    all_subqueryies.extend(subqueries);
                    expr_to_rewrite_expr_map.insert(expr, rewrite_exprs);
                }
                if all_subqueryies.is_empty() {
                    // regular projection, no subquery exists clause here
                    return Ok(None);
                }
                // iterate through all subqueries in predicate, turning each into a left join
                let mut cur_input = projection.input.as_ref().clone();
                for (subquery, alias) in all_subqueryies {
                    if let Some((optimized_subquery, expr_check_map)) =
                        build_join(&subquery, &cur_input, &alias)?
                    {
                        cur_input = optimized_subquery;
                        if !expr_check_map.is_empty() {
                            if let Some(expr) = subquery_to_expr_map.get(&subquery) {
                                if let Some(rewrite_expr) =
                                    expr_to_rewrite_expr_map.get(expr)
                                {
                                    let new_expr =
                                        rewrite_expr.clone().transform_up(&|expr| {
                                            if let Expr::Column(col) = &expr {
                                                if let Some(map_expr) =
                                                    expr_check_map.get(&col.name)
                                                {
                                                    Ok(Transformed::Yes(map_expr.clone()))
                                                } else {
                                                    Ok(Transformed::No(expr))
                                                }
                                            } else {
                                                Ok(Transformed::No(expr))
                                            }
                                        })?;
                                    expr_to_rewrite_expr_map.insert(expr, new_expr);
                                }
                            }
                        }
                    } else {
                        // if we can't handle all of the subqueries then bail for now
                        return Ok(None);
                    }
                }

                let mut proj_exprs = vec![];
                for expr in projection.expr.iter() {
                    let old_expr_name = expr.display_name()?;
                    let new_expr = expr_to_rewrite_expr_map.get(expr).unwrap();
                    let new_expr_name = new_expr.display_name()?;
                    if new_expr_name != old_expr_name {
                        proj_exprs.push(new_expr.clone().alias(old_expr_name))
                    } else {
                        proj_exprs.push(new_expr.clone());
                    }
                }
                let new_plan = LogicalPlanBuilder::from(cur_input)
                    .project(proj_exprs)?
                    .build()?;
                Ok(Some(new_plan))
            }

            _ => Ok(None),
        }
    }