in datafusion/core/src/physical_optimizer/dist_enforcement.rs [660:742]
fn try_reorder(
join_keys: JoinKeyPairs,
expected: &[Arc<dyn PhysicalExpr>],
equivalence_properties: &EquivalenceProperties,
) -> Option<(JoinKeyPairs, Vec<usize>)> {
let mut normalized_expected = vec![];
let mut normalized_left_keys = vec![];
let mut normalized_right_keys = vec![];
if join_keys.left_keys.len() != expected.len() {
return None;
}
if expr_list_eq_strict_order(expected, &join_keys.left_keys)
|| expr_list_eq_strict_order(expected, &join_keys.right_keys)
{
return Some((join_keys, vec![]));
} else if !equivalence_properties.classes().is_empty() {
normalized_expected = expected
.iter()
.map(|e| {
normalize_expr_with_equivalence_properties(
e.clone(),
equivalence_properties.classes(),
)
})
.collect::<Vec<_>>();
assert_eq!(normalized_expected.len(), expected.len());
normalized_left_keys = join_keys
.left_keys
.iter()
.map(|e| {
normalize_expr_with_equivalence_properties(
e.clone(),
equivalence_properties.classes(),
)
})
.collect::<Vec<_>>();
assert_eq!(join_keys.left_keys.len(), normalized_left_keys.len());
normalized_right_keys = join_keys
.right_keys
.iter()
.map(|e| {
normalize_expr_with_equivalence_properties(
e.clone(),
equivalence_properties.classes(),
)
})
.collect::<Vec<_>>();
assert_eq!(join_keys.right_keys.len(), normalized_right_keys.len());
if expr_list_eq_strict_order(&normalized_expected, &normalized_left_keys)
|| expr_list_eq_strict_order(&normalized_expected, &normalized_right_keys)
{
return Some((join_keys, vec![]));
}
}
let new_positions = expected_expr_positions(&join_keys.left_keys, expected)
.or_else(|| expected_expr_positions(&join_keys.right_keys, expected))
.or_else(|| expected_expr_positions(&normalized_left_keys, &normalized_expected))
.or_else(|| {
expected_expr_positions(&normalized_right_keys, &normalized_expected)
});
if let Some(positions) = new_positions {
let mut new_left_keys = vec![];
let mut new_right_keys = vec![];
for pos in positions.iter() {
new_left_keys.push(join_keys.left_keys[*pos].clone());
new_right_keys.push(join_keys.right_keys[*pos].clone());
}
Some((
JoinKeyPairs {
left_keys: new_left_keys,
right_keys: new_right_keys,
},
positions,
))
} else {
None
}
}