fn try_optimize()

in datafusion/optimizer/src/push_down_filter.rs [592:863]


    fn try_optimize(
        &self,
        plan: &LogicalPlan,
        _config: &dyn OptimizerConfig,
    ) -> Result<Option<LogicalPlan>> {
        let filter = match plan {
            LogicalPlan::Filter(filter) => filter,
            // we also need to pushdown filter in Join.
            LogicalPlan::Join(join) => return push_down_join(plan, join, None),
            _ => return Ok(None),
        };

        let child_plan = filter.input.as_ref();
        let new_plan = match child_plan {
            LogicalPlan::Filter(child_filter) => {
                let parents_predicates = split_conjunction(&filter.predicate);
                let set: HashSet<&&Expr> = parents_predicates.iter().collect();

                let new_predicates = parents_predicates
                    .iter()
                    .chain(
                        split_conjunction(&child_filter.predicate)
                            .iter()
                            .filter(|e| !set.contains(e)),
                    )
                    .map(|e| (*e).clone())
                    .collect::<Vec<_>>();
                let new_predicate = conjunction(new_predicates).ok_or_else(|| {
                    DataFusionError::Plan("at least one expression exists".to_string())
                })?;
                let new_filter = LogicalPlan::Filter(Filter::try_new(
                    new_predicate,
                    child_filter.input.clone(),
                )?);
                self.try_optimize(&new_filter, _config)?
                    .unwrap_or(new_filter)
            }
            LogicalPlan::Repartition(_)
            | LogicalPlan::Distinct(_)
            | LogicalPlan::Sort(_) => {
                // commutable
                let new_filter =
                    plan.with_new_inputs(&[child_plan.inputs()[0].clone()])?;
                child_plan.with_new_inputs(&[new_filter])?
            }
            LogicalPlan::SubqueryAlias(subquery_alias) => {
                let mut replace_map = HashMap::new();
                for (i, field) in
                    subquery_alias.input.schema().fields().iter().enumerate()
                {
                    replace_map.insert(
                        subquery_alias
                            .schema
                            .fields()
                            .get(i)
                            .unwrap()
                            .qualified_name(),
                        Expr::Column(field.qualified_column()),
                    );
                }
                let new_predicate =
                    replace_cols_by_name(filter.predicate.clone(), &replace_map)?;
                let new_filter = LogicalPlan::Filter(Filter::try_new(
                    new_predicate,
                    subquery_alias.input.clone(),
                )?);
                child_plan.with_new_inputs(&[new_filter])?
            }
            LogicalPlan::Projection(projection) => {
                // A projection is filter-commutable, but re-writes all predicate expressions
                // collect projection.
                let replace_map = projection
                    .schema
                    .fields()
                    .iter()
                    .enumerate()
                    .map(|(i, field)| {
                        // strip alias, as they should not be part of filters
                        let expr = match &projection.expr[i] {
                            Expr::Alias(Alias { expr, .. }) => expr.as_ref().clone(),
                            expr => expr.clone(),
                        };

                        (field.qualified_name(), expr)
                    })
                    .collect::<HashMap<_, _>>();

                // re-write all filters based on this projection
                // E.g. in `Filter: b\n  Projection: a > 1 as b`, we can swap them, but the filter must be "a > 1"
                let new_filter = LogicalPlan::Filter(Filter::try_new(
                    replace_cols_by_name(filter.predicate.clone(), &replace_map)?,
                    projection.input.clone(),
                )?);

                child_plan.with_new_inputs(&[new_filter])?
            }
            LogicalPlan::Union(union) => {
                let mut inputs = Vec::with_capacity(union.inputs.len());
                for input in &union.inputs {
                    let mut replace_map = HashMap::new();
                    for (i, field) in input.schema().fields().iter().enumerate() {
                        replace_map.insert(
                            union.schema.fields().get(i).unwrap().qualified_name(),
                            Expr::Column(field.qualified_column()),
                        );
                    }

                    let push_predicate =
                        replace_cols_by_name(filter.predicate.clone(), &replace_map)?;
                    inputs.push(Arc::new(LogicalPlan::Filter(Filter::try_new(
                        push_predicate,
                        input.clone(),
                    )?)))
                }
                LogicalPlan::Union(Union {
                    inputs,
                    schema: plan.schema().clone(),
                })
            }
            LogicalPlan::Aggregate(agg) => {
                // We can push down Predicate which in groupby_expr.
                let group_expr_columns = agg
                    .group_expr
                    .iter()
                    .map(|e| Ok(Column::from_qualified_name(e.display_name()?)))
                    .collect::<Result<HashSet<_>>>()?;

                let predicates = utils::split_conjunction_owned(filter.predicate.clone());

                let mut keep_predicates = vec![];
                let mut push_predicates = vec![];
                for expr in predicates {
                    let cols = expr.to_columns()?;
                    if cols.iter().all(|c| group_expr_columns.contains(c)) {
                        push_predicates.push(expr);
                    } else {
                        keep_predicates.push(expr);
                    }
                }

                // As for plan Filter: Column(a+b) > 0 -- Agg: groupby:[Column(a)+Column(b)]
                // After push, we need to replace `a+b` with Column(a)+Column(b)
                // So we need create a replace_map, add {`a+b` --> Expr(Column(a)+Column(b))}
                let mut replace_map = HashMap::new();
                for expr in &agg.group_expr {
                    replace_map.insert(expr.display_name()?, expr.clone());
                }
                let replaced_push_predicates = push_predicates
                    .iter()
                    .map(|expr| replace_cols_by_name(expr.clone(), &replace_map))
                    .collect::<Result<Vec<_>>>()?;

                let child = match conjunction(replaced_push_predicates) {
                    Some(predicate) => LogicalPlan::Filter(Filter::try_new(
                        predicate,
                        Arc::new((*agg.input).clone()),
                    )?),
                    None => (*agg.input).clone(),
                };
                let new_agg = filter.input.with_new_inputs(&vec![child])?;
                match conjunction(keep_predicates) {
                    Some(predicate) => LogicalPlan::Filter(Filter::try_new(
                        predicate,
                        Arc::new(new_agg),
                    )?),
                    None => new_agg,
                }
            }
            LogicalPlan::Join(join) => {
                match push_down_join(&filter.input, join, Some(&filter.predicate))? {
                    Some(optimized_plan) => optimized_plan,
                    None => return Ok(None),
                }
            }
            LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
                let predicates = utils::split_conjunction_owned(filter.predicate.clone());
                push_down_all_join(
                    predicates,
                    vec![],
                    &filter.input,
                    left,
                    right,
                    vec![],
                    false,
                )?
            }
            LogicalPlan::TableScan(scan) => {
                let filter_predicates = split_conjunction(&filter.predicate);
                let results = scan
                    .source
                    .supports_filters_pushdown(filter_predicates.as_slice())?;
                let zip = filter_predicates.iter().zip(results.into_iter());

                let new_scan_filters = zip
                    .clone()
                    .filter(|(_, res)| res != &TableProviderFilterPushDown::Unsupported)
                    .map(|(pred, _)| *pred);
                let new_scan_filters: Vec<Expr> = scan
                    .filters
                    .iter()
                    .chain(new_scan_filters)
                    .unique()
                    .cloned()
                    .collect();
                let new_predicate: Vec<Expr> = zip
                    .filter(|(_, res)| res != &TableProviderFilterPushDown::Exact)
                    .map(|(pred, _)| (*pred).clone())
                    .collect();

                let new_scan = LogicalPlan::TableScan(TableScan {
                    source: scan.source.clone(),
                    projection: scan.projection.clone(),
                    projected_schema: scan.projected_schema.clone(),
                    table_name: scan.table_name.clone(),
                    filters: new_scan_filters,
                    fetch: scan.fetch,
                });

                match conjunction(new_predicate) {
                    Some(predicate) => LogicalPlan::Filter(Filter::try_new(
                        predicate,
                        Arc::new(new_scan),
                    )?),
                    None => new_scan,
                }
            }
            LogicalPlan::Extension(extension_plan) => {
                let prevent_cols =
                    extension_plan.node.prevent_predicate_push_down_columns();

                let predicates = utils::split_conjunction_owned(filter.predicate.clone());

                let mut keep_predicates = vec![];
                let mut push_predicates = vec![];
                for expr in predicates {
                    let cols = expr.to_columns()?;
                    if cols.iter().any(|c| prevent_cols.contains(&c.name)) {
                        keep_predicates.push(expr);
                    } else {
                        push_predicates.push(expr);
                    }
                }

                let new_children = match conjunction(push_predicates) {
                    Some(predicate) => extension_plan
                        .node
                        .inputs()
                        .into_iter()
                        .map(|child| {
                            Ok(LogicalPlan::Filter(Filter::try_new(
                                predicate.clone(),
                                Arc::new(child.clone()),
                            )?))
                        })
                        .collect::<Result<Vec<_>>>()?,
                    None => extension_plan.node.inputs().into_iter().cloned().collect(),
                };
                // extension with new inputs.
                let new_extension = child_plan.with_new_inputs(&new_children)?;

                match conjunction(keep_predicates) {
                    Some(predicate) => LogicalPlan::Filter(Filter::try_new(
                        predicate,
                        Arc::new(new_extension),
                    )?),
                    None => new_extension,
                }
            }
            _ => return Ok(None),
        };
        Ok(Some(new_plan))
    }