in datafusion/optimizer/src/decorrelate.rs [165:375]
fn f_up(&mut self, plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
let subquery_schema = plan.schema();
match &plan {
LogicalPlan::Filter(plan_filter) => {
let subquery_filter_exprs = split_conjunction(&plan_filter.predicate);
self.can_pull_over_aggregation = self.can_pull_over_aggregation
&& subquery_filter_exprs
.iter()
.filter(|e| e.contains_outer())
.all(|&e| can_pullup_over_aggregation(e));
let (mut join_filters, subquery_filters) =
find_join_exprs(subquery_filter_exprs)?;
if let Some(in_predicate) = &self.in_predicate_opt {
// in_predicate may be already included in the join filters, remove it from the join filters first.
join_filters = remove_duplicated_filter(join_filters, in_predicate);
}
let correlated_subquery_cols =
collect_subquery_cols(&join_filters, subquery_schema)?;
for expr in join_filters {
if !self.join_filters.contains(&expr) {
self.join_filters.push(expr)
}
}
let mut expr_result_map_for_count_bug = HashMap::new();
let pull_up_expr_opt = if let Some(expr_result_map) =
self.collected_count_expr_map.get(plan_filter.input.deref())
{
if let Some(expr) = conjunction(subquery_filters.clone()) {
filter_exprs_evaluation_result_on_empty_batch(
&expr,
Arc::clone(plan_filter.input.schema()),
expr_result_map,
&mut expr_result_map_for_count_bug,
)?
} else {
None
}
} else {
None
};
match (&pull_up_expr_opt, &self.pull_up_having_expr) {
(Some(_), Some(_)) => {
// Error path
plan_err!("Unsupported Subquery plan")
}
(Some(_), None) => {
self.pull_up_having_expr = pull_up_expr_opt;
let new_plan =
LogicalPlanBuilder::from((*plan_filter.input).clone())
.build()?;
self.correlated_subquery_cols_map
.insert(new_plan.clone(), correlated_subquery_cols);
Ok(Transformed::yes(new_plan))
}
(None, _) => {
// if the subquery still has filter expressions, restore them.
let mut plan =
LogicalPlanBuilder::from((*plan_filter.input).clone());
if let Some(expr) = conjunction(subquery_filters) {
plan = plan.filter(expr)?
}
let new_plan = plan.build()?;
self.correlated_subquery_cols_map
.insert(new_plan.clone(), correlated_subquery_cols);
Ok(Transformed::yes(new_plan))
}
}
}
LogicalPlan::Projection(projection)
if self.in_predicate_opt.is_some() || !self.join_filters.is_empty() =>
{
let mut local_correlated_cols = BTreeSet::new();
collect_local_correlated_cols(
&plan,
&self.correlated_subquery_cols_map,
&mut local_correlated_cols,
);
// add missing columns to Projection
let mut missing_exprs =
self.collect_missing_exprs(&projection.expr, &local_correlated_cols)?;
let mut expr_result_map_for_count_bug = HashMap::new();
if let Some(expr_result_map) =
self.collected_count_expr_map.get(projection.input.deref())
{
proj_exprs_evaluation_result_on_empty_batch(
&projection.expr,
projection.input.schema(),
expr_result_map,
&mut expr_result_map_for_count_bug,
)?;
if !expr_result_map_for_count_bug.is_empty() {
// has count bug
let un_matched_row = Expr::Column(Column::new_unqualified(
UN_MATCHED_ROW_INDICATOR.to_string(),
));
// add the unmatched rows indicator to the Projection expressions
missing_exprs.push(un_matched_row);
}
}
let new_plan = LogicalPlanBuilder::from((*projection.input).clone())
.project(missing_exprs)?
.build()?;
if !expr_result_map_for_count_bug.is_empty() {
self.collected_count_expr_map
.insert(new_plan.clone(), expr_result_map_for_count_bug);
}
Ok(Transformed::yes(new_plan))
}
LogicalPlan::Aggregate(aggregate)
if self.in_predicate_opt.is_some() || !self.join_filters.is_empty() =>
{
// If the aggregation is from a distinct it will not change the result for
// exists/in subqueries so we can still pull up all predicates.
let is_distinct = aggregate.aggr_expr.is_empty();
if !is_distinct {
self.can_pull_up = self.can_pull_up && self.can_pull_over_aggregation;
}
let mut local_correlated_cols = BTreeSet::new();
collect_local_correlated_cols(
&plan,
&self.correlated_subquery_cols_map,
&mut local_correlated_cols,
);
// add missing columns to Aggregation's group expressions
let mut missing_exprs = self.collect_missing_exprs(
&aggregate.group_expr,
&local_correlated_cols,
)?;
// if the original group expressions are empty, need to handle the Count bug
let mut expr_result_map_for_count_bug = HashMap::new();
if self.need_handle_count_bug
&& aggregate.group_expr.is_empty()
&& !missing_exprs.is_empty()
{
agg_exprs_evaluation_result_on_empty_batch(
&aggregate.aggr_expr,
aggregate.input.schema(),
&mut expr_result_map_for_count_bug,
)?;
if !expr_result_map_for_count_bug.is_empty() {
// has count bug
let un_matched_row = lit(true).alias(UN_MATCHED_ROW_INDICATOR);
// add the unmatched rows indicator to the Aggregation's group expressions
missing_exprs.push(un_matched_row);
}
}
let new_plan = LogicalPlanBuilder::from((*aggregate.input).clone())
.aggregate(missing_exprs, aggregate.aggr_expr.to_vec())?
.build()?;
if !expr_result_map_for_count_bug.is_empty() {
self.collected_count_expr_map
.insert(new_plan.clone(), expr_result_map_for_count_bug);
}
Ok(Transformed::yes(new_plan))
}
LogicalPlan::SubqueryAlias(alias) => {
let mut local_correlated_cols = BTreeSet::new();
collect_local_correlated_cols(
&plan,
&self.correlated_subquery_cols_map,
&mut local_correlated_cols,
);
let mut new_correlated_cols = BTreeSet::new();
for col in local_correlated_cols.iter() {
new_correlated_cols
.insert(Column::new(Some(alias.alias.clone()), col.name.clone()));
}
self.correlated_subquery_cols_map
.insert(plan.clone(), new_correlated_cols);
if let Some(input_map) =
self.collected_count_expr_map.get(alias.input.deref())
{
self.collected_count_expr_map
.insert(plan.clone(), input_map.clone());
}
Ok(Transformed::no(plan))
}
LogicalPlan::Limit(limit) => {
let input_expr_map = self
.collected_count_expr_map
.get(limit.input.deref())
.cloned();
// handling the limit clause in the subquery
let new_plan = match (self.exists_sub_query, self.join_filters.is_empty())
{
// Correlated exist subquery, remove the limit(so that correlated expressions can pull up)
(true, false) => Transformed::yes(match limit.get_fetch_type()? {
FetchType::Literal(Some(0)) => {
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::clone(limit.input.schema()),
})
}
_ => LogicalPlanBuilder::from((*limit.input).clone()).build()?,
}),
_ => Transformed::no(plan),
};
if let Some(input_map) = input_expr_map {
self.collected_count_expr_map
.insert(new_plan.data.clone(), input_map);
}
Ok(new_plan)
}
_ => Ok(Transformed::no(plan)),
}
}