fn f_up()

in datafusion/optimizer/src/decorrelate.rs [165:375]


    fn f_up(&mut self, plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
        let subquery_schema = plan.schema();
        match &plan {
            LogicalPlan::Filter(plan_filter) => {
                let subquery_filter_exprs = split_conjunction(&plan_filter.predicate);
                self.can_pull_over_aggregation = self.can_pull_over_aggregation
                    && subquery_filter_exprs
                        .iter()
                        .filter(|e| e.contains_outer())
                        .all(|&e| can_pullup_over_aggregation(e));
                let (mut join_filters, subquery_filters) =
                    find_join_exprs(subquery_filter_exprs)?;
                if let Some(in_predicate) = &self.in_predicate_opt {
                    // in_predicate may be already included in the join filters, remove it from the join filters first.
                    join_filters = remove_duplicated_filter(join_filters, in_predicate);
                }
                let correlated_subquery_cols =
                    collect_subquery_cols(&join_filters, subquery_schema)?;
                for expr in join_filters {
                    if !self.join_filters.contains(&expr) {
                        self.join_filters.push(expr)
                    }
                }

                let mut expr_result_map_for_count_bug = HashMap::new();
                let pull_up_expr_opt = if let Some(expr_result_map) =
                    self.collected_count_expr_map.get(plan_filter.input.deref())
                {
                    if let Some(expr) = conjunction(subquery_filters.clone()) {
                        filter_exprs_evaluation_result_on_empty_batch(
                            &expr,
                            Arc::clone(plan_filter.input.schema()),
                            expr_result_map,
                            &mut expr_result_map_for_count_bug,
                        )?
                    } else {
                        None
                    }
                } else {
                    None
                };

                match (&pull_up_expr_opt, &self.pull_up_having_expr) {
                    (Some(_), Some(_)) => {
                        // Error path
                        plan_err!("Unsupported Subquery plan")
                    }
                    (Some(_), None) => {
                        self.pull_up_having_expr = pull_up_expr_opt;
                        let new_plan =
                            LogicalPlanBuilder::from((*plan_filter.input).clone())
                                .build()?;
                        self.correlated_subquery_cols_map
                            .insert(new_plan.clone(), correlated_subquery_cols);
                        Ok(Transformed::yes(new_plan))
                    }
                    (None, _) => {
                        // if the subquery still has filter expressions, restore them.
                        let mut plan =
                            LogicalPlanBuilder::from((*plan_filter.input).clone());
                        if let Some(expr) = conjunction(subquery_filters) {
                            plan = plan.filter(expr)?
                        }
                        let new_plan = plan.build()?;
                        self.correlated_subquery_cols_map
                            .insert(new_plan.clone(), correlated_subquery_cols);
                        Ok(Transformed::yes(new_plan))
                    }
                }
            }
            LogicalPlan::Projection(projection)
                if self.in_predicate_opt.is_some() || !self.join_filters.is_empty() =>
            {
                let mut local_correlated_cols = BTreeSet::new();
                collect_local_correlated_cols(
                    &plan,
                    &self.correlated_subquery_cols_map,
                    &mut local_correlated_cols,
                );
                // add missing columns to Projection
                let mut missing_exprs =
                    self.collect_missing_exprs(&projection.expr, &local_correlated_cols)?;

                let mut expr_result_map_for_count_bug = HashMap::new();
                if let Some(expr_result_map) =
                    self.collected_count_expr_map.get(projection.input.deref())
                {
                    proj_exprs_evaluation_result_on_empty_batch(
                        &projection.expr,
                        projection.input.schema(),
                        expr_result_map,
                        &mut expr_result_map_for_count_bug,
                    )?;
                    if !expr_result_map_for_count_bug.is_empty() {
                        // has count bug
                        let un_matched_row = Expr::Column(Column::new_unqualified(
                            UN_MATCHED_ROW_INDICATOR.to_string(),
                        ));
                        // add the unmatched rows indicator to the Projection expressions
                        missing_exprs.push(un_matched_row);
                    }
                }

                let new_plan = LogicalPlanBuilder::from((*projection.input).clone())
                    .project(missing_exprs)?
                    .build()?;
                if !expr_result_map_for_count_bug.is_empty() {
                    self.collected_count_expr_map
                        .insert(new_plan.clone(), expr_result_map_for_count_bug);
                }
                Ok(Transformed::yes(new_plan))
            }
            LogicalPlan::Aggregate(aggregate)
                if self.in_predicate_opt.is_some() || !self.join_filters.is_empty() =>
            {
                // If the aggregation is from a distinct it will not change the result for
                // exists/in subqueries so we can still pull up all predicates.
                let is_distinct = aggregate.aggr_expr.is_empty();
                if !is_distinct {
                    self.can_pull_up = self.can_pull_up && self.can_pull_over_aggregation;
                }
                let mut local_correlated_cols = BTreeSet::new();
                collect_local_correlated_cols(
                    &plan,
                    &self.correlated_subquery_cols_map,
                    &mut local_correlated_cols,
                );
                // add missing columns to Aggregation's group expressions
                let mut missing_exprs = self.collect_missing_exprs(
                    &aggregate.group_expr,
                    &local_correlated_cols,
                )?;

                // if the original group expressions are empty, need to handle the Count bug
                let mut expr_result_map_for_count_bug = HashMap::new();
                if self.need_handle_count_bug
                    && aggregate.group_expr.is_empty()
                    && !missing_exprs.is_empty()
                {
                    agg_exprs_evaluation_result_on_empty_batch(
                        &aggregate.aggr_expr,
                        aggregate.input.schema(),
                        &mut expr_result_map_for_count_bug,
                    )?;
                    if !expr_result_map_for_count_bug.is_empty() {
                        // has count bug
                        let un_matched_row = lit(true).alias(UN_MATCHED_ROW_INDICATOR);
                        // add the unmatched rows indicator to the Aggregation's group expressions
                        missing_exprs.push(un_matched_row);
                    }
                }
                let new_plan = LogicalPlanBuilder::from((*aggregate.input).clone())
                    .aggregate(missing_exprs, aggregate.aggr_expr.to_vec())?
                    .build()?;
                if !expr_result_map_for_count_bug.is_empty() {
                    self.collected_count_expr_map
                        .insert(new_plan.clone(), expr_result_map_for_count_bug);
                }
                Ok(Transformed::yes(new_plan))
            }
            LogicalPlan::SubqueryAlias(alias) => {
                let mut local_correlated_cols = BTreeSet::new();
                collect_local_correlated_cols(
                    &plan,
                    &self.correlated_subquery_cols_map,
                    &mut local_correlated_cols,
                );
                let mut new_correlated_cols = BTreeSet::new();
                for col in local_correlated_cols.iter() {
                    new_correlated_cols
                        .insert(Column::new(Some(alias.alias.clone()), col.name.clone()));
                }
                self.correlated_subquery_cols_map
                    .insert(plan.clone(), new_correlated_cols);
                if let Some(input_map) =
                    self.collected_count_expr_map.get(alias.input.deref())
                {
                    self.collected_count_expr_map
                        .insert(plan.clone(), input_map.clone());
                }
                Ok(Transformed::no(plan))
            }
            LogicalPlan::Limit(limit) => {
                let input_expr_map = self
                    .collected_count_expr_map
                    .get(limit.input.deref())
                    .cloned();
                // handling the limit clause in the subquery
                let new_plan = match (self.exists_sub_query, self.join_filters.is_empty())
                {
                    // Correlated exist subquery, remove the limit(so that correlated expressions can pull up)
                    (true, false) => Transformed::yes(match limit.get_fetch_type()? {
                        FetchType::Literal(Some(0)) => {
                            LogicalPlan::EmptyRelation(EmptyRelation {
                                produce_one_row: false,
                                schema: Arc::clone(limit.input.schema()),
                            })
                        }
                        _ => LogicalPlanBuilder::from((*limit.input).clone()).build()?,
                    }),
                    _ => Transformed::no(plan),
                };
                if let Some(input_map) = input_expr_map {
                    self.collected_count_expr_map
                        .insert(new_plan.data.clone(), input_map);
                }
                Ok(new_plan)
            }
            _ => Ok(Transformed::no(plan)),
        }
    }