fn push_down_all_join()

in datafusion/optimizer/src/push_down_filter.rs [418:523]


fn push_down_all_join(
    predicates: Vec<Expr>,
    inferred_join_predicates: Vec<Expr>,
    mut join: Join,
    on_filter: Vec<Expr>,
) -> Result<Transformed<LogicalPlan>> {
    let is_inner_join = join.join_type == JoinType::Inner;
    // Get pushable predicates from current optimizer state
    let (left_preserved, right_preserved) = lr_is_preserved(join.join_type);

    // The predicates can be divided to three categories:
    // 1) can push through join to its children(left or right)
    // 2) can be converted to join conditions if the join type is Inner
    // 3) should be kept as filter conditions
    let left_schema = join.left.schema();
    let right_schema = join.right.schema();
    let mut left_push = vec![];
    let mut right_push = vec![];
    let mut keep_predicates = vec![];
    let mut join_conditions = vec![];
    let mut checker = ColumnChecker::new(left_schema, right_schema);
    for predicate in predicates {
        if left_preserved && checker.is_left_only(&predicate) {
            left_push.push(predicate);
        } else if right_preserved && checker.is_right_only(&predicate) {
            right_push.push(predicate);
        } else if is_inner_join && can_evaluate_as_join_condition(&predicate)? {
            // Here we do not differ it is eq or non-eq predicate, ExtractEquijoinPredicate will extract the eq predicate
            // and convert to the join on condition
            join_conditions.push(predicate);
        } else {
            keep_predicates.push(predicate);
        }
    }

    // For infer predicates, if they can not push through join, just drop them
    for predicate in inferred_join_predicates {
        if left_preserved && checker.is_left_only(&predicate) {
            left_push.push(predicate);
        } else if right_preserved && checker.is_right_only(&predicate) {
            right_push.push(predicate);
        }
    }

    let mut on_filter_join_conditions = vec![];
    let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(join.join_type);

    if !on_filter.is_empty() {
        for on in on_filter {
            if on_left_preserved && checker.is_left_only(&on) {
                left_push.push(on)
            } else if on_right_preserved && checker.is_right_only(&on) {
                right_push.push(on)
            } else {
                on_filter_join_conditions.push(on)
            }
        }
    }

    // Extract from OR clause, generate new predicates for both side of join if possible.
    // We only track the unpushable predicates above.
    if left_preserved {
        left_push.extend(extract_or_clauses_for_join(&keep_predicates, left_schema));
        left_push.extend(extract_or_clauses_for_join(&join_conditions, left_schema));
    }
    if right_preserved {
        right_push.extend(extract_or_clauses_for_join(&keep_predicates, right_schema));
        right_push.extend(extract_or_clauses_for_join(&join_conditions, right_schema));
    }

    // For predicates from join filter, we should check with if a join side is preserved
    // in term of join filtering.
    if on_left_preserved {
        left_push.extend(extract_or_clauses_for_join(
            &on_filter_join_conditions,
            left_schema,
        ));
    }
    if on_right_preserved {
        right_push.extend(extract_or_clauses_for_join(
            &on_filter_join_conditions,
            right_schema,
        ));
    }

    if let Some(predicate) = conjunction(left_push) {
        join.left = Arc::new(LogicalPlan::Filter(Filter::try_new(predicate, join.left)?));
    }
    if let Some(predicate) = conjunction(right_push) {
        join.right =
            Arc::new(LogicalPlan::Filter(Filter::try_new(predicate, join.right)?));
    }

    // Add any new join conditions as the non join predicates
    join_conditions.extend(on_filter_join_conditions);
    join.filter = conjunction(join_conditions);

    // wrap the join on the filter whose predicates must be kept, if any
    let plan = LogicalPlan::Join(join);
    let plan = if let Some(predicate) = conjunction(keep_predicates) {
        LogicalPlan::Filter(Filter::try_new(predicate, Arc::new(plan))?)
    } else {
        plan
    };
    Ok(Transformed::yes(plan))
}