fn push_down_all_join()

in datafusion/optimizer/src/push_down_filter.rs [346:480]


fn push_down_all_join(
    predicates: Vec<Expr>,
    infer_predicates: Vec<Expr>,
    join_plan: &LogicalPlan,
    left: &LogicalPlan,
    right: &LogicalPlan,
    on_filter: Vec<Expr>,
    is_inner_join: bool,
) -> Result<LogicalPlan> {
    let on_filter_empty = on_filter.is_empty();
    // Get pushable predicates from current optimizer state
    let (left_preserved, right_preserved) = lr_is_preserved(join_plan)?;

    // The predicates can be divided to three categories:
    // 1) can push through join to its children(left or right)
    // 2) can be converted to join conditions if the join type is Inner
    // 3) should be kept as filter conditions
    let mut left_push = vec![];
    let mut right_push = vec![];
    let mut keep_predicates = vec![];
    let mut join_conditions = vec![];
    for predicate in predicates {
        if left_preserved && can_pushdown_join_predicate(&predicate, left.schema())? {
            left_push.push(predicate);
        } else if right_preserved
            && can_pushdown_join_predicate(&predicate, right.schema())?
        {
            right_push.push(predicate);
        } else if is_inner_join && can_evaluate_as_join_condition(&predicate)? {
            // Here we do not differ it is eq or non-eq predicate, ExtractEquijoinPredicate will extract the eq predicate
            // and convert to the join on condition
            join_conditions.push(predicate);
        } else {
            keep_predicates.push(predicate);
        }
    }

    // For infer predicates, if they can not push through join, just drop them
    for predicate in infer_predicates {
        if left_preserved && can_pushdown_join_predicate(&predicate, left.schema())? {
            left_push.push(predicate);
        } else if right_preserved
            && can_pushdown_join_predicate(&predicate, right.schema())?
        {
            right_push.push(predicate);
        }
    }

    if !on_filter.is_empty() {
        let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(join_plan)?;
        for on in on_filter {
            if on_left_preserved && can_pushdown_join_predicate(&on, left.schema())? {
                left_push.push(on)
            } else if on_right_preserved
                && can_pushdown_join_predicate(&on, right.schema())?
            {
                right_push.push(on)
            } else {
                join_conditions.push(on)
            }
        }
    }

    // Extract from OR clause, generate new predicates for both side of join if possible.
    // We only track the unpushable predicates above.
    let or_to_left = extract_or_clauses_for_join(
        &keep_predicates.iter().collect::<Vec<_>>(),
        left.schema(),
        left_preserved,
    );
    let or_to_right = extract_or_clauses_for_join(
        &keep_predicates.iter().collect::<Vec<_>>(),
        right.schema(),
        right_preserved,
    );
    let on_or_to_left = extract_or_clauses_for_join(
        &join_conditions.iter().collect::<Vec<_>>(),
        left.schema(),
        left_preserved,
    );
    let on_or_to_right = extract_or_clauses_for_join(
        &join_conditions.iter().collect::<Vec<_>>(),
        right.schema(),
        right_preserved,
    );

    left_push.extend(or_to_left);
    left_push.extend(on_or_to_left);
    right_push.extend(or_to_right);
    right_push.extend(on_or_to_right);

    let left = match conjunction(left_push) {
        Some(predicate) => {
            LogicalPlan::Filter(Filter::try_new(predicate, Arc::new(left.clone()))?)
        }
        None => left.clone(),
    };
    let right = match conjunction(right_push) {
        Some(predicate) => {
            LogicalPlan::Filter(Filter::try_new(predicate, Arc::new(right.clone()))?)
        }
        None => right.clone(),
    };
    // Create a new Join with the new `left` and `right`
    //
    // expressions() output for Join is a vector consisting of
    //   1. join keys - columns mentioned in ON clause
    //   2. optional predicate - in case join filter is not empty,
    //      it always will be the last element, otherwise result
    //      vector will contain only join keys (without additional
    //      element representing filter).
    let expr = join_plan.expressions();
    let mut new_exprs = if !on_filter_empty {
        expr[..expr.len() - 1].to_vec()
    } else {
        expr
    };
    if !join_conditions.is_empty() {
        new_exprs.push(join_conditions.into_iter().reduce(Expr::and).unwrap());
    }
    let plan = from_plan(join_plan, &new_exprs, &[left, right])?;

    if keep_predicates.is_empty() {
        Ok(plan)
    } else {
        // wrap the join on the filter whose predicates must be kept
        match conjunction(keep_predicates) {
            Some(predicate) => Ok(LogicalPlan::Filter(Filter::try_new(
                predicate,
                Arc::new(plan),
            )?)),
            None => Ok(plan),
        }
    }
}