in datafusion/optimizer/src/decorrelate.rs [95:298]
fn mutate(&mut self, plan: LogicalPlan) -> Result<LogicalPlan> {
let subquery_schema = plan.schema().clone();
match &plan {
LogicalPlan::Filter(plan_filter) => {
let subquery_filter_exprs = split_conjunction(&plan_filter.predicate);
let (mut join_filters, subquery_filters) =
find_join_exprs(subquery_filter_exprs)?;
if let Some(in_predicate) = &self.in_predicate_opt {
// in_predicate may be already included in the join filters, remove it from the join filters first.
join_filters = remove_duplicated_filter(join_filters, in_predicate);
}
let correlated_subquery_cols =
collect_subquery_cols(&join_filters, subquery_schema)?;
for expr in join_filters {
if !self.join_filters.contains(&expr) {
self.join_filters.push(expr)
}
}
let mut expr_result_map_for_count_bug = HashMap::new();
let pull_up_expr_opt = if let Some(expr_result_map) =
self.collected_count_expr_map.get(plan_filter.input.deref())
{
if let Some(expr) = conjunction(subquery_filters.clone()) {
filter_exprs_evaluation_result_on_empty_batch(
&expr,
plan_filter.input.schema().clone(),
expr_result_map,
&mut expr_result_map_for_count_bug,
)?
} else {
None
}
} else {
None
};
match (&pull_up_expr_opt, &self.pull_up_having_expr) {
(Some(_), Some(_)) => {
// Error path
plan_err!("Unsupported Subquery plan")
}
(Some(_), None) => {
self.pull_up_having_expr = pull_up_expr_opt;
let new_plan =
LogicalPlanBuilder::from((*plan_filter.input).clone())
.build()?;
self.correlated_subquery_cols_map
.insert(new_plan.clone(), correlated_subquery_cols);
Ok(new_plan)
}
(None, _) => {
// if the subquery still has filter expressions, restore them.
let mut plan =
LogicalPlanBuilder::from((*plan_filter.input).clone());
if let Some(expr) = conjunction(subquery_filters) {
plan = plan.filter(expr)?
}
let new_plan = plan.build()?;
self.correlated_subquery_cols_map
.insert(new_plan.clone(), correlated_subquery_cols);
Ok(new_plan)
}
}
}
LogicalPlan::Projection(projection)
if self.in_predicate_opt.is_some() || !self.join_filters.is_empty() =>
{
let mut local_correlated_cols = BTreeSet::new();
collect_local_correlated_cols(
&plan,
&self.correlated_subquery_cols_map,
&mut local_correlated_cols,
);
// add missing columns to Projection
let mut missing_exprs =
self.collect_missing_exprs(&projection.expr, &local_correlated_cols)?;
let mut expr_result_map_for_count_bug = HashMap::new();
if let Some(expr_result_map) =
self.collected_count_expr_map.get(projection.input.deref())
{
proj_exprs_evaluation_result_on_empty_batch(
&projection.expr,
projection.input.schema().clone(),
expr_result_map,
&mut expr_result_map_for_count_bug,
)?;
if !expr_result_map_for_count_bug.is_empty() {
// has count bug
let un_matched_row = Expr::Column(Column::new_unqualified(
UN_MATCHED_ROW_INDICATOR.to_string(),
));
// add the unmatched rows indicator to the Projection expressions
missing_exprs.push(un_matched_row);
}
}
let new_plan = LogicalPlanBuilder::from((*projection.input).clone())
.project(missing_exprs)?
.build()?;
if !expr_result_map_for_count_bug.is_empty() {
self.collected_count_expr_map
.insert(new_plan.clone(), expr_result_map_for_count_bug);
}
Ok(new_plan)
}
LogicalPlan::Aggregate(aggregate)
if self.in_predicate_opt.is_some() || !self.join_filters.is_empty() =>
{
let mut local_correlated_cols = BTreeSet::new();
collect_local_correlated_cols(
&plan,
&self.correlated_subquery_cols_map,
&mut local_correlated_cols,
);
// add missing columns to Aggregation's group expressions
let mut missing_exprs = self.collect_missing_exprs(
&aggregate.group_expr,
&local_correlated_cols,
)?;
// if the original group expressions are empty, need to handle the Count bug
let mut expr_result_map_for_count_bug = HashMap::new();
if self.need_handle_count_bug
&& aggregate.group_expr.is_empty()
&& !missing_exprs.is_empty()
{
agg_exprs_evaluation_result_on_empty_batch(
&aggregate.aggr_expr,
aggregate.input.schema().clone(),
&mut expr_result_map_for_count_bug,
)?;
if !expr_result_map_for_count_bug.is_empty() {
// has count bug
let un_matched_row = Expr::Alias(Alias::new(
Expr::Literal(ScalarValue::Boolean(Some(true))),
UN_MATCHED_ROW_INDICATOR.to_string(),
));
// add the unmatched rows indicator to the Aggregation's group expressions
missing_exprs.push(un_matched_row);
}
}
let new_plan = LogicalPlanBuilder::from((*aggregate.input).clone())
.aggregate(missing_exprs, aggregate.aggr_expr.to_vec())?
.build()?;
if !expr_result_map_for_count_bug.is_empty() {
self.collected_count_expr_map
.insert(new_plan.clone(), expr_result_map_for_count_bug);
}
Ok(new_plan)
}
LogicalPlan::SubqueryAlias(alias) => {
let mut local_correlated_cols = BTreeSet::new();
collect_local_correlated_cols(
&plan,
&self.correlated_subquery_cols_map,
&mut local_correlated_cols,
);
let mut new_correlated_cols = BTreeSet::new();
for col in local_correlated_cols.iter() {
new_correlated_cols
.insert(Column::new(Some(alias.alias.clone()), col.name.clone()));
}
self.correlated_subquery_cols_map
.insert(plan.clone(), new_correlated_cols);
if let Some(input_map) =
self.collected_count_expr_map.get(alias.input.deref())
{
self.collected_count_expr_map
.insert(plan.clone(), input_map.clone());
}
Ok(plan)
}
LogicalPlan::Limit(limit) => {
let input_expr_map = self
.collected_count_expr_map
.get(limit.input.deref())
.cloned();
// handling the limit clause in the subquery
let new_plan = match (self.exists_sub_query, self.join_filters.is_empty())
{
// Correlated exist subquery, remove the limit(so that correlated expressions can pull up)
(true, false) => {
if limit.fetch.filter(|limit_row| *limit_row == 0).is_some() {
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: limit.input.schema().clone(),
})
} else {
LogicalPlanBuilder::from((*limit.input).clone()).build()?
}
}
_ => plan,
};
if let Some(input_map) = input_expr_map {
self.collected_count_expr_map
.insert(new_plan.clone(), input_map);
}
Ok(new_plan)
}
_ => Ok(plan),
}
}