in datafusion/core/src/physical_optimizer/sort_pushdown.rs [195:283]
fn pushdown_requirement_to_children(
plan: &Arc<dyn ExecutionPlan>,
parent_required: Option<&[PhysicalSortRequirement]>,
) -> Result<Option<Vec<Option<Vec<PhysicalSortRequirement>>>>> {
const ERR_MSG: &str = "Expects parent requirement to contain something";
let err = || DataFusionError::Plan(ERR_MSG.to_string());
let maintains_input_order = plan.maintains_input_order();
if is_window(plan) {
let required_input_ordering = plan.required_input_ordering();
let request_child = required_input_ordering[0].as_deref();
let child_plan = plan.children()[0].clone();
match determine_children_requirement(parent_required, request_child, child_plan) {
RequirementsCompatibility::Satisfy => {
Ok(Some(vec![request_child.map(|r| r.to_vec())]))
}
RequirementsCompatibility::Compatible(adjusted) => Ok(Some(vec![adjusted])),
RequirementsCompatibility::NonCompatible => Ok(None),
}
} else if is_union(plan) {
// UnionExec does not have real sort requirements for its input. Here we change the adjusted_request_ordering to UnionExec's output ordering and
// propagate the sort requirements down to correct the unnecessary descendant SortExec under the UnionExec
Ok(Some(vec![
parent_required.map(|elem| elem.to_vec());
plan.children().len()
]))
} else if let Some(smj) = plan.as_any().downcast_ref::<SortMergeJoinExec>() {
// If the current plan is SortMergeJoinExec
let left_columns_len = smj.left.schema().fields().len();
let parent_required_expr = PhysicalSortRequirement::to_sort_exprs(
parent_required.ok_or_else(err)?.iter().cloned(),
);
let expr_source_side =
expr_source_sides(&parent_required_expr, smj.join_type, left_columns_len);
match expr_source_side {
Some(JoinSide::Left) => try_pushdown_requirements_to_join(
smj,
parent_required,
parent_required_expr,
JoinSide::Left,
),
Some(JoinSide::Right) => {
let right_offset =
smj.schema().fields.len() - smj.right.schema().fields.len();
let new_right_required =
shift_right_required(parent_required.ok_or_else(err)?, right_offset)?;
let new_right_required_expr = PhysicalSortRequirement::to_sort_exprs(
new_right_required.iter().cloned(),
);
try_pushdown_requirements_to_join(
smj,
parent_required,
new_right_required_expr,
JoinSide::Right,
)
}
_ => {
// Can not decide the expr side for SortMergeJoinExec, can not push down
Ok(None)
}
}
} else if maintains_input_order.is_empty()
|| !maintains_input_order.iter().any(|o| *o)
|| plan.as_any().is::<RepartitionExec>()
|| plan.as_any().is::<FilterExec>()
// TODO: Add support for Projection push down
|| plan.as_any().is::<ProjectionExec>()
|| is_limit(plan)
|| plan.as_any().is::<HashJoinExec>()
{
// If the current plan is a leaf node or can not maintain any of the input ordering, can not pushed down requirements.
// For RepartitionExec, we always choose to not push down the sort requirements even the RepartitionExec(input_partition=1) could maintain input ordering.
// Pushing down is not beneficial
Ok(None)
} else {
Ok(Some(
maintains_input_order
.iter()
.map(|flag| {
if *flag {
parent_required.map(|elem| elem.to_vec())
} else {
None
}
})
.collect::<Vec<_>>(),
))
}
// TODO: Add support for Projection push down
}