in datafusion/optimizer/src/push_down_filter.rs [346:480]
fn push_down_all_join(
predicates: Vec<Expr>,
infer_predicates: Vec<Expr>,
join_plan: &LogicalPlan,
left: &LogicalPlan,
right: &LogicalPlan,
on_filter: Vec<Expr>,
is_inner_join: bool,
) -> Result<LogicalPlan> {
let on_filter_empty = on_filter.is_empty();
// Get pushable predicates from current optimizer state
let (left_preserved, right_preserved) = lr_is_preserved(join_plan)?;
// The predicates can be divided to three categories:
// 1) can push through join to its children(left or right)
// 2) can be converted to join conditions if the join type is Inner
// 3) should be kept as filter conditions
let mut left_push = vec![];
let mut right_push = vec![];
let mut keep_predicates = vec![];
let mut join_conditions = vec![];
for predicate in predicates {
if left_preserved && can_pushdown_join_predicate(&predicate, left.schema())? {
left_push.push(predicate);
} else if right_preserved
&& can_pushdown_join_predicate(&predicate, right.schema())?
{
right_push.push(predicate);
} else if is_inner_join && can_evaluate_as_join_condition(&predicate)? {
// Here we do not differ it is eq or non-eq predicate, ExtractEquijoinPredicate will extract the eq predicate
// and convert to the join on condition
join_conditions.push(predicate);
} else {
keep_predicates.push(predicate);
}
}
// For infer predicates, if they can not push through join, just drop them
for predicate in infer_predicates {
if left_preserved && can_pushdown_join_predicate(&predicate, left.schema())? {
left_push.push(predicate);
} else if right_preserved
&& can_pushdown_join_predicate(&predicate, right.schema())?
{
right_push.push(predicate);
}
}
if !on_filter.is_empty() {
let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(join_plan)?;
for on in on_filter {
if on_left_preserved && can_pushdown_join_predicate(&on, left.schema())? {
left_push.push(on)
} else if on_right_preserved
&& can_pushdown_join_predicate(&on, right.schema())?
{
right_push.push(on)
} else {
join_conditions.push(on)
}
}
}
// Extract from OR clause, generate new predicates for both side of join if possible.
// We only track the unpushable predicates above.
let or_to_left = extract_or_clauses_for_join(
&keep_predicates.iter().collect::<Vec<_>>(),
left.schema(),
left_preserved,
);
let or_to_right = extract_or_clauses_for_join(
&keep_predicates.iter().collect::<Vec<_>>(),
right.schema(),
right_preserved,
);
let on_or_to_left = extract_or_clauses_for_join(
&join_conditions.iter().collect::<Vec<_>>(),
left.schema(),
left_preserved,
);
let on_or_to_right = extract_or_clauses_for_join(
&join_conditions.iter().collect::<Vec<_>>(),
right.schema(),
right_preserved,
);
left_push.extend(or_to_left);
left_push.extend(on_or_to_left);
right_push.extend(or_to_right);
right_push.extend(on_or_to_right);
let left = match conjunction(left_push) {
Some(predicate) => {
LogicalPlan::Filter(Filter::try_new(predicate, Arc::new(left.clone()))?)
}
None => left.clone(),
};
let right = match conjunction(right_push) {
Some(predicate) => {
LogicalPlan::Filter(Filter::try_new(predicate, Arc::new(right.clone()))?)
}
None => right.clone(),
};
// Create a new Join with the new `left` and `right`
//
// expressions() output for Join is a vector consisting of
// 1. join keys - columns mentioned in ON clause
// 2. optional predicate - in case join filter is not empty,
// it always will be the last element, otherwise result
// vector will contain only join keys (without additional
// element representing filter).
let expr = join_plan.expressions();
let mut new_exprs = if !on_filter_empty {
expr[..expr.len() - 1].to_vec()
} else {
expr
};
if !join_conditions.is_empty() {
new_exprs.push(join_conditions.into_iter().reduce(Expr::and).unwrap());
}
let plan = from_plan(join_plan, &new_exprs, &[left, right])?;
if keep_predicates.is_empty() {
Ok(plan)
} else {
// wrap the join on the filter whose predicates must be kept
match conjunction(keep_predicates) {
Some(predicate) => Ok(LogicalPlan::Filter(Filter::try_new(
predicate,
Arc::new(plan),
)?)),
None => Ok(plan),
}
}
}