fn mutate()

in datafusion/optimizer/src/decorrelate.rs [95:298]


    fn mutate(&mut self, plan: LogicalPlan) -> Result<LogicalPlan> {
        let subquery_schema = plan.schema().clone();
        match &plan {
            LogicalPlan::Filter(plan_filter) => {
                let subquery_filter_exprs = split_conjunction(&plan_filter.predicate);
                let (mut join_filters, subquery_filters) =
                    find_join_exprs(subquery_filter_exprs)?;
                if let Some(in_predicate) = &self.in_predicate_opt {
                    // in_predicate may be already included in the join filters, remove it from the join filters first.
                    join_filters = remove_duplicated_filter(join_filters, in_predicate);
                }
                let correlated_subquery_cols =
                    collect_subquery_cols(&join_filters, subquery_schema)?;
                for expr in join_filters {
                    if !self.join_filters.contains(&expr) {
                        self.join_filters.push(expr)
                    }
                }

                let mut expr_result_map_for_count_bug = HashMap::new();
                let pull_up_expr_opt = if let Some(expr_result_map) =
                    self.collected_count_expr_map.get(plan_filter.input.deref())
                {
                    if let Some(expr) = conjunction(subquery_filters.clone()) {
                        filter_exprs_evaluation_result_on_empty_batch(
                            &expr,
                            plan_filter.input.schema().clone(),
                            expr_result_map,
                            &mut expr_result_map_for_count_bug,
                        )?
                    } else {
                        None
                    }
                } else {
                    None
                };

                match (&pull_up_expr_opt, &self.pull_up_having_expr) {
                    (Some(_), Some(_)) => {
                        // Error path
                        plan_err!("Unsupported Subquery plan")
                    }
                    (Some(_), None) => {
                        self.pull_up_having_expr = pull_up_expr_opt;
                        let new_plan =
                            LogicalPlanBuilder::from((*plan_filter.input).clone())
                                .build()?;
                        self.correlated_subquery_cols_map
                            .insert(new_plan.clone(), correlated_subquery_cols);
                        Ok(new_plan)
                    }
                    (None, _) => {
                        // if the subquery still has filter expressions, restore them.
                        let mut plan =
                            LogicalPlanBuilder::from((*plan_filter.input).clone());
                        if let Some(expr) = conjunction(subquery_filters) {
                            plan = plan.filter(expr)?
                        }
                        let new_plan = plan.build()?;
                        self.correlated_subquery_cols_map
                            .insert(new_plan.clone(), correlated_subquery_cols);
                        Ok(new_plan)
                    }
                }
            }
            LogicalPlan::Projection(projection)
                if self.in_predicate_opt.is_some() || !self.join_filters.is_empty() =>
            {
                let mut local_correlated_cols = BTreeSet::new();
                collect_local_correlated_cols(
                    &plan,
                    &self.correlated_subquery_cols_map,
                    &mut local_correlated_cols,
                );
                // add missing columns to Projection
                let mut missing_exprs =
                    self.collect_missing_exprs(&projection.expr, &local_correlated_cols)?;

                let mut expr_result_map_for_count_bug = HashMap::new();
                if let Some(expr_result_map) =
                    self.collected_count_expr_map.get(projection.input.deref())
                {
                    proj_exprs_evaluation_result_on_empty_batch(
                        &projection.expr,
                        projection.input.schema().clone(),
                        expr_result_map,
                        &mut expr_result_map_for_count_bug,
                    )?;
                    if !expr_result_map_for_count_bug.is_empty() {
                        // has count bug
                        let un_matched_row = Expr::Column(Column::new_unqualified(
                            UN_MATCHED_ROW_INDICATOR.to_string(),
                        ));
                        // add the unmatched rows indicator to the Projection expressions
                        missing_exprs.push(un_matched_row);
                    }
                }

                let new_plan = LogicalPlanBuilder::from((*projection.input).clone())
                    .project(missing_exprs)?
                    .build()?;
                if !expr_result_map_for_count_bug.is_empty() {
                    self.collected_count_expr_map
                        .insert(new_plan.clone(), expr_result_map_for_count_bug);
                }
                Ok(new_plan)
            }
            LogicalPlan::Aggregate(aggregate)
                if self.in_predicate_opt.is_some() || !self.join_filters.is_empty() =>
            {
                let mut local_correlated_cols = BTreeSet::new();
                collect_local_correlated_cols(
                    &plan,
                    &self.correlated_subquery_cols_map,
                    &mut local_correlated_cols,
                );
                // add missing columns to Aggregation's group expressions
                let mut missing_exprs = self.collect_missing_exprs(
                    &aggregate.group_expr,
                    &local_correlated_cols,
                )?;

                // if the original group expressions are empty, need to handle the Count bug
                let mut expr_result_map_for_count_bug = HashMap::new();
                if self.need_handle_count_bug
                    && aggregate.group_expr.is_empty()
                    && !missing_exprs.is_empty()
                {
                    agg_exprs_evaluation_result_on_empty_batch(
                        &aggregate.aggr_expr,
                        aggregate.input.schema().clone(),
                        &mut expr_result_map_for_count_bug,
                    )?;
                    if !expr_result_map_for_count_bug.is_empty() {
                        // has count bug
                        let un_matched_row = Expr::Alias(Alias::new(
                            Expr::Literal(ScalarValue::Boolean(Some(true))),
                            UN_MATCHED_ROW_INDICATOR.to_string(),
                        ));
                        // add the unmatched rows indicator to the Aggregation's group expressions
                        missing_exprs.push(un_matched_row);
                    }
                }
                let new_plan = LogicalPlanBuilder::from((*aggregate.input).clone())
                    .aggregate(missing_exprs, aggregate.aggr_expr.to_vec())?
                    .build()?;
                if !expr_result_map_for_count_bug.is_empty() {
                    self.collected_count_expr_map
                        .insert(new_plan.clone(), expr_result_map_for_count_bug);
                }
                Ok(new_plan)
            }
            LogicalPlan::SubqueryAlias(alias) => {
                let mut local_correlated_cols = BTreeSet::new();
                collect_local_correlated_cols(
                    &plan,
                    &self.correlated_subquery_cols_map,
                    &mut local_correlated_cols,
                );
                let mut new_correlated_cols = BTreeSet::new();
                for col in local_correlated_cols.iter() {
                    new_correlated_cols
                        .insert(Column::new(Some(alias.alias.clone()), col.name.clone()));
                }
                self.correlated_subquery_cols_map
                    .insert(plan.clone(), new_correlated_cols);
                if let Some(input_map) =
                    self.collected_count_expr_map.get(alias.input.deref())
                {
                    self.collected_count_expr_map
                        .insert(plan.clone(), input_map.clone());
                }
                Ok(plan)
            }
            LogicalPlan::Limit(limit) => {
                let input_expr_map = self
                    .collected_count_expr_map
                    .get(limit.input.deref())
                    .cloned();
                // handling the limit clause in the subquery
                let new_plan = match (self.exists_sub_query, self.join_filters.is_empty())
                {
                    // Correlated exist subquery, remove the limit(so that correlated expressions can pull up)
                    (true, false) => {
                        if limit.fetch.filter(|limit_row| *limit_row == 0).is_some() {
                            LogicalPlan::EmptyRelation(EmptyRelation {
                                produce_one_row: false,
                                schema: limit.input.schema().clone(),
                            })
                        } else {
                            LogicalPlanBuilder::from((*limit.input).clone()).build()?
                        }
                    }
                    _ => plan,
                };
                if let Some(input_map) = input_expr_map {
                    self.collected_count_expr_map
                        .insert(new_plan.clone(), input_map);
                }
                Ok(new_plan)
            }
            _ => Ok(plan),
        }
    }