in datafusion/optimizer/src/push_down_filter.rs [418:523]
fn push_down_all_join(
predicates: Vec<Expr>,
inferred_join_predicates: Vec<Expr>,
mut join: Join,
on_filter: Vec<Expr>,
) -> Result<Transformed<LogicalPlan>> {
let is_inner_join = join.join_type == JoinType::Inner;
// Get pushable predicates from current optimizer state
let (left_preserved, right_preserved) = lr_is_preserved(join.join_type);
// The predicates can be divided to three categories:
// 1) can push through join to its children(left or right)
// 2) can be converted to join conditions if the join type is Inner
// 3) should be kept as filter conditions
let left_schema = join.left.schema();
let right_schema = join.right.schema();
let mut left_push = vec![];
let mut right_push = vec![];
let mut keep_predicates = vec![];
let mut join_conditions = vec![];
let mut checker = ColumnChecker::new(left_schema, right_schema);
for predicate in predicates {
if left_preserved && checker.is_left_only(&predicate) {
left_push.push(predicate);
} else if right_preserved && checker.is_right_only(&predicate) {
right_push.push(predicate);
} else if is_inner_join && can_evaluate_as_join_condition(&predicate)? {
// Here we do not differ it is eq or non-eq predicate, ExtractEquijoinPredicate will extract the eq predicate
// and convert to the join on condition
join_conditions.push(predicate);
} else {
keep_predicates.push(predicate);
}
}
// For infer predicates, if they can not push through join, just drop them
for predicate in inferred_join_predicates {
if left_preserved && checker.is_left_only(&predicate) {
left_push.push(predicate);
} else if right_preserved && checker.is_right_only(&predicate) {
right_push.push(predicate);
}
}
let mut on_filter_join_conditions = vec![];
let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(join.join_type);
if !on_filter.is_empty() {
for on in on_filter {
if on_left_preserved && checker.is_left_only(&on) {
left_push.push(on)
} else if on_right_preserved && checker.is_right_only(&on) {
right_push.push(on)
} else {
on_filter_join_conditions.push(on)
}
}
}
// Extract from OR clause, generate new predicates for both side of join if possible.
// We only track the unpushable predicates above.
if left_preserved {
left_push.extend(extract_or_clauses_for_join(&keep_predicates, left_schema));
left_push.extend(extract_or_clauses_for_join(&join_conditions, left_schema));
}
if right_preserved {
right_push.extend(extract_or_clauses_for_join(&keep_predicates, right_schema));
right_push.extend(extract_or_clauses_for_join(&join_conditions, right_schema));
}
// For predicates from join filter, we should check with if a join side is preserved
// in term of join filtering.
if on_left_preserved {
left_push.extend(extract_or_clauses_for_join(
&on_filter_join_conditions,
left_schema,
));
}
if on_right_preserved {
right_push.extend(extract_or_clauses_for_join(
&on_filter_join_conditions,
right_schema,
));
}
if let Some(predicate) = conjunction(left_push) {
join.left = Arc::new(LogicalPlan::Filter(Filter::try_new(predicate, join.left)?));
}
if let Some(predicate) = conjunction(right_push) {
join.right =
Arc::new(LogicalPlan::Filter(Filter::try_new(predicate, join.right)?));
}
// Add any new join conditions as the non join predicates
join_conditions.extend(on_filter_join_conditions);
join.filter = conjunction(join_conditions);
// wrap the join on the filter whose predicates must be kept, if any
let plan = LogicalPlan::Join(join);
let plan = if let Some(predicate) = conjunction(keep_predicates) {
LogicalPlan::Filter(Filter::try_new(predicate, Arc::new(plan))?)
} else {
plan
};
Ok(Transformed::yes(plan))
}