in datafusion/optimizer/src/push_down_filter.rs [592:863]
fn try_optimize(
&self,
plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
let filter = match plan {
LogicalPlan::Filter(filter) => filter,
// we also need to pushdown filter in Join.
LogicalPlan::Join(join) => return push_down_join(plan, join, None),
_ => return Ok(None),
};
let child_plan = filter.input.as_ref();
let new_plan = match child_plan {
LogicalPlan::Filter(child_filter) => {
let parents_predicates = split_conjunction(&filter.predicate);
let set: HashSet<&&Expr> = parents_predicates.iter().collect();
let new_predicates = parents_predicates
.iter()
.chain(
split_conjunction(&child_filter.predicate)
.iter()
.filter(|e| !set.contains(e)),
)
.map(|e| (*e).clone())
.collect::<Vec<_>>();
let new_predicate = conjunction(new_predicates).ok_or_else(|| {
DataFusionError::Plan("at least one expression exists".to_string())
})?;
let new_filter = LogicalPlan::Filter(Filter::try_new(
new_predicate,
child_filter.input.clone(),
)?);
self.try_optimize(&new_filter, _config)?
.unwrap_or(new_filter)
}
LogicalPlan::Repartition(_)
| LogicalPlan::Distinct(_)
| LogicalPlan::Sort(_) => {
// commutable
let new_filter =
plan.with_new_inputs(&[child_plan.inputs()[0].clone()])?;
child_plan.with_new_inputs(&[new_filter])?
}
LogicalPlan::SubqueryAlias(subquery_alias) => {
let mut replace_map = HashMap::new();
for (i, field) in
subquery_alias.input.schema().fields().iter().enumerate()
{
replace_map.insert(
subquery_alias
.schema
.fields()
.get(i)
.unwrap()
.qualified_name(),
Expr::Column(field.qualified_column()),
);
}
let new_predicate =
replace_cols_by_name(filter.predicate.clone(), &replace_map)?;
let new_filter = LogicalPlan::Filter(Filter::try_new(
new_predicate,
subquery_alias.input.clone(),
)?);
child_plan.with_new_inputs(&[new_filter])?
}
LogicalPlan::Projection(projection) => {
// A projection is filter-commutable, but re-writes all predicate expressions
// collect projection.
let replace_map = projection
.schema
.fields()
.iter()
.enumerate()
.map(|(i, field)| {
// strip alias, as they should not be part of filters
let expr = match &projection.expr[i] {
Expr::Alias(Alias { expr, .. }) => expr.as_ref().clone(),
expr => expr.clone(),
};
(field.qualified_name(), expr)
})
.collect::<HashMap<_, _>>();
// re-write all filters based on this projection
// E.g. in `Filter: b\n Projection: a > 1 as b`, we can swap them, but the filter must be "a > 1"
let new_filter = LogicalPlan::Filter(Filter::try_new(
replace_cols_by_name(filter.predicate.clone(), &replace_map)?,
projection.input.clone(),
)?);
child_plan.with_new_inputs(&[new_filter])?
}
LogicalPlan::Union(union) => {
let mut inputs = Vec::with_capacity(union.inputs.len());
for input in &union.inputs {
let mut replace_map = HashMap::new();
for (i, field) in input.schema().fields().iter().enumerate() {
replace_map.insert(
union.schema.fields().get(i).unwrap().qualified_name(),
Expr::Column(field.qualified_column()),
);
}
let push_predicate =
replace_cols_by_name(filter.predicate.clone(), &replace_map)?;
inputs.push(Arc::new(LogicalPlan::Filter(Filter::try_new(
push_predicate,
input.clone(),
)?)))
}
LogicalPlan::Union(Union {
inputs,
schema: plan.schema().clone(),
})
}
LogicalPlan::Aggregate(agg) => {
// We can push down Predicate which in groupby_expr.
let group_expr_columns = agg
.group_expr
.iter()
.map(|e| Ok(Column::from_qualified_name(e.display_name()?)))
.collect::<Result<HashSet<_>>>()?;
let predicates = utils::split_conjunction_owned(filter.predicate.clone());
let mut keep_predicates = vec![];
let mut push_predicates = vec![];
for expr in predicates {
let cols = expr.to_columns()?;
if cols.iter().all(|c| group_expr_columns.contains(c)) {
push_predicates.push(expr);
} else {
keep_predicates.push(expr);
}
}
// As for plan Filter: Column(a+b) > 0 -- Agg: groupby:[Column(a)+Column(b)]
// After push, we need to replace `a+b` with Column(a)+Column(b)
// So we need create a replace_map, add {`a+b` --> Expr(Column(a)+Column(b))}
let mut replace_map = HashMap::new();
for expr in &agg.group_expr {
replace_map.insert(expr.display_name()?, expr.clone());
}
let replaced_push_predicates = push_predicates
.iter()
.map(|expr| replace_cols_by_name(expr.clone(), &replace_map))
.collect::<Result<Vec<_>>>()?;
let child = match conjunction(replaced_push_predicates) {
Some(predicate) => LogicalPlan::Filter(Filter::try_new(
predicate,
Arc::new((*agg.input).clone()),
)?),
None => (*agg.input).clone(),
};
let new_agg = filter.input.with_new_inputs(&vec![child])?;
match conjunction(keep_predicates) {
Some(predicate) => LogicalPlan::Filter(Filter::try_new(
predicate,
Arc::new(new_agg),
)?),
None => new_agg,
}
}
LogicalPlan::Join(join) => {
match push_down_join(&filter.input, join, Some(&filter.predicate))? {
Some(optimized_plan) => optimized_plan,
None => return Ok(None),
}
}
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
let predicates = utils::split_conjunction_owned(filter.predicate.clone());
push_down_all_join(
predicates,
vec![],
&filter.input,
left,
right,
vec![],
false,
)?
}
LogicalPlan::TableScan(scan) => {
let filter_predicates = split_conjunction(&filter.predicate);
let results = scan
.source
.supports_filters_pushdown(filter_predicates.as_slice())?;
let zip = filter_predicates.iter().zip(results.into_iter());
let new_scan_filters = zip
.clone()
.filter(|(_, res)| res != &TableProviderFilterPushDown::Unsupported)
.map(|(pred, _)| *pred);
let new_scan_filters: Vec<Expr> = scan
.filters
.iter()
.chain(new_scan_filters)
.unique()
.cloned()
.collect();
let new_predicate: Vec<Expr> = zip
.filter(|(_, res)| res != &TableProviderFilterPushDown::Exact)
.map(|(pred, _)| (*pred).clone())
.collect();
let new_scan = LogicalPlan::TableScan(TableScan {
source: scan.source.clone(),
projection: scan.projection.clone(),
projected_schema: scan.projected_schema.clone(),
table_name: scan.table_name.clone(),
filters: new_scan_filters,
fetch: scan.fetch,
});
match conjunction(new_predicate) {
Some(predicate) => LogicalPlan::Filter(Filter::try_new(
predicate,
Arc::new(new_scan),
)?),
None => new_scan,
}
}
LogicalPlan::Extension(extension_plan) => {
let prevent_cols =
extension_plan.node.prevent_predicate_push_down_columns();
let predicates = utils::split_conjunction_owned(filter.predicate.clone());
let mut keep_predicates = vec![];
let mut push_predicates = vec![];
for expr in predicates {
let cols = expr.to_columns()?;
if cols.iter().any(|c| prevent_cols.contains(&c.name)) {
keep_predicates.push(expr);
} else {
push_predicates.push(expr);
}
}
let new_children = match conjunction(push_predicates) {
Some(predicate) => extension_plan
.node
.inputs()
.into_iter()
.map(|child| {
Ok(LogicalPlan::Filter(Filter::try_new(
predicate.clone(),
Arc::new(child.clone()),
)?))
})
.collect::<Result<Vec<_>>>()?,
None => extension_plan.node.inputs().into_iter().cloned().collect(),
};
// extension with new inputs.
let new_extension = child_plan.with_new_inputs(&new_children)?;
match conjunction(keep_predicates) {
Some(predicate) => LogicalPlan::Filter(Filter::try_new(
predicate,
Arc::new(new_extension),
)?),
None => new_extension,
}
}
_ => return Ok(None),
};
Ok(Some(new_plan))
}