in datafusion/core/src/physical_optimizer/dist_enforcement.rs [144:300]
fn adjust_input_keys_ordering(
requirements: PlanWithKeyRequirements,
) -> Result<Transformed<PlanWithKeyRequirements>> {
let parent_required = requirements.required_key_ordering.clone();
let plan_any = requirements.plan.as_any();
let transformed = if let Some(HashJoinExec {
left,
right,
on,
filter,
join_type,
mode,
null_equals_null,
..
}) = plan_any.downcast_ref::<HashJoinExec>()
{
match mode {
PartitionMode::Partitioned => {
let join_constructor =
|new_conditions: (Vec<(Column, Column)>, Vec<SortOptions>)| {
Ok(Arc::new(HashJoinExec::try_new(
left.clone(),
right.clone(),
new_conditions.0,
filter.clone(),
join_type,
PartitionMode::Partitioned,
*null_equals_null,
)?) as Arc<dyn ExecutionPlan>)
};
Some(reorder_partitioned_join_keys(
requirements.plan.clone(),
&parent_required,
on,
vec![],
&join_constructor,
)?)
}
PartitionMode::CollectLeft => {
let new_right_request = match join_type {
JoinType::Inner | JoinType::Right => shift_right_required(
&parent_required,
left.schema().fields().len(),
),
JoinType::RightSemi | JoinType::RightAnti => {
Some(parent_required.clone())
}
JoinType::Left
| JoinType::LeftSemi
| JoinType::LeftAnti
| JoinType::Full => None,
};
// Push down requirements to the right side
Some(PlanWithKeyRequirements {
plan: requirements.plan.clone(),
required_key_ordering: vec![],
request_key_ordering: vec![None, new_right_request],
})
}
PartitionMode::Auto => {
// Can not satisfy, clear the current requirements and generate new empty requirements
Some(PlanWithKeyRequirements::new(requirements.plan.clone()))
}
}
} else if let Some(CrossJoinExec { left, .. }) =
plan_any.downcast_ref::<CrossJoinExec>()
{
let left_columns_len = left.schema().fields().len();
// Push down requirements to the right side
Some(PlanWithKeyRequirements {
plan: requirements.plan.clone(),
required_key_ordering: vec![],
request_key_ordering: vec![
None,
shift_right_required(&parent_required, left_columns_len),
],
})
} else if let Some(SortMergeJoinExec {
left,
right,
on,
join_type,
sort_options,
null_equals_null,
..
}) = plan_any.downcast_ref::<SortMergeJoinExec>()
{
let join_constructor =
|new_conditions: (Vec<(Column, Column)>, Vec<SortOptions>)| {
Ok(Arc::new(SortMergeJoinExec::try_new(
left.clone(),
right.clone(),
new_conditions.0,
*join_type,
new_conditions.1,
*null_equals_null,
)?) as Arc<dyn ExecutionPlan>)
};
Some(reorder_partitioned_join_keys(
requirements.plan.clone(),
&parent_required,
on,
sort_options.clone(),
&join_constructor,
)?)
} else if let Some(aggregate_exec) = plan_any.downcast_ref::<AggregateExec>() {
if !parent_required.is_empty() {
match aggregate_exec.mode {
AggregateMode::FinalPartitioned => Some(reorder_aggregate_keys(
requirements.plan.clone(),
&parent_required,
aggregate_exec,
)?),
_ => Some(PlanWithKeyRequirements::new(requirements.plan.clone())),
}
} else {
// Keep everything unchanged
None
}
} else if let Some(ProjectionExec { expr, .. }) =
plan_any.downcast_ref::<ProjectionExec>()
{
// For Projection, we need to transform the requirements to the columns before the Projection
// And then to push down the requirements
// Construct a mapping from new name to the the orginal Column
let new_required = map_columns_before_projection(&parent_required, expr);
if new_required.len() == parent_required.len() {
Some(PlanWithKeyRequirements {
plan: requirements.plan.clone(),
required_key_ordering: vec![],
request_key_ordering: vec![Some(new_required.clone())],
})
} else {
// Can not satisfy, clear the current requirements and generate new empty requirements
Some(PlanWithKeyRequirements::new(requirements.plan.clone()))
}
} else if plan_any.downcast_ref::<RepartitionExec>().is_some()
|| plan_any.downcast_ref::<CoalescePartitionsExec>().is_some()
|| plan_any.downcast_ref::<WindowAggExec>().is_some()
{
Some(PlanWithKeyRequirements::new(requirements.plan.clone()))
} else {
// By default, push down the parent requirements to children
let children_len = requirements.plan.children().len();
Some(PlanWithKeyRequirements {
plan: requirements.plan.clone(),
required_key_ordering: vec![],
request_key_ordering: vec![Some(parent_required.clone()); children_len],
})
};
Ok(if let Some(transformed) = transformed {
Transformed::Yes(transformed)
} else {
Transformed::No(requirements)
})
}