in datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs [197:350]
fn pushdown_requirement_to_children(
plan: &Arc<dyn ExecutionPlan>,
parent_required: &LexRequirement,
) -> Result<Option<Vec<Option<LexRequirement>>>> {
let maintains_input_order = plan.maintains_input_order();
if is_window(plan) {
let required_input_ordering = plan.required_input_ordering();
let request_child = required_input_ordering[0].clone().unwrap_or_default();
let child_plan = plan.children().swap_remove(0);
match determine_children_requirement(parent_required, &request_child, child_plan)
{
RequirementsCompatibility::Satisfy => {
let req = (!request_child.is_empty())
.then(|| LexRequirement::new(request_child.to_vec()));
Ok(Some(vec![req]))
}
RequirementsCompatibility::Compatible(adjusted) => {
// If parent requirements are more specific than output ordering
// of the window plan, then we can deduce that the parent expects
// an ordering from the columns created by window functions. If
// that's the case, we block the pushdown of sort operation.
if !plan
.equivalence_properties()
.ordering_satisfy_requirement(parent_required)
{
return Ok(None);
}
Ok(Some(vec![adjusted]))
}
RequirementsCompatibility::NonCompatible => Ok(None),
}
} else if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
let sort_req = LexRequirement::from(
sort_exec
.properties()
.output_ordering()
.cloned()
.unwrap_or_else(LexOrdering::default),
);
if sort_exec
.properties()
.eq_properties
.requirements_compatible(parent_required, &sort_req)
{
debug_assert!(!parent_required.is_empty());
Ok(Some(vec![Some(LexRequirement::new(
parent_required.to_vec(),
))]))
} else {
Ok(None)
}
} else if plan.fetch().is_some()
&& plan.supports_limit_pushdown()
&& plan
.maintains_input_order()
.iter()
.all(|maintain| *maintain)
{
let output_req = LexRequirement::from(
plan.properties()
.output_ordering()
.cloned()
.unwrap_or_else(LexOrdering::default),
);
// Push down through operator with fetch when:
// - requirement is aligned with output ordering
// - it preserves ordering during execution
if plan
.properties()
.eq_properties
.requirements_compatible(parent_required, &output_req)
{
let req = (!parent_required.is_empty())
.then(|| LexRequirement::new(parent_required.to_vec()));
Ok(Some(vec![req]))
} else {
Ok(None)
}
} else if is_union(plan) {
// UnionExec does not have real sort requirements for its input. Here we change the adjusted_request_ordering to UnionExec's output ordering and
// propagate the sort requirements down to correct the unnecessary descendant SortExec under the UnionExec
let req = (!parent_required.is_empty()).then(|| parent_required.clone());
Ok(Some(vec![req; plan.children().len()]))
} else if let Some(smj) = plan.as_any().downcast_ref::<SortMergeJoinExec>() {
// If the current plan is SortMergeJoinExec
let left_columns_len = smj.left().schema().fields().len();
let parent_required_expr = LexOrdering::from(parent_required.clone());
match expr_source_side(
parent_required_expr.as_ref(),
smj.join_type(),
left_columns_len,
) {
Some(JoinSide::Left) => try_pushdown_requirements_to_join(
smj,
parent_required,
parent_required_expr.as_ref(),
JoinSide::Left,
),
Some(JoinSide::Right) => {
let right_offset =
smj.schema().fields.len() - smj.right().schema().fields.len();
let new_right_required =
shift_right_required(parent_required, right_offset)?;
let new_right_required_expr = LexOrdering::from(new_right_required);
try_pushdown_requirements_to_join(
smj,
parent_required,
new_right_required_expr.as_ref(),
JoinSide::Right,
)
}
_ => {
// Can not decide the expr side for SortMergeJoinExec, can not push down
Ok(None)
}
}
} else if maintains_input_order.is_empty()
|| !maintains_input_order.iter().any(|o| *o)
|| plan.as_any().is::<RepartitionExec>()
|| plan.as_any().is::<FilterExec>()
// TODO: Add support for Projection push down
|| plan.as_any().is::<ProjectionExec>()
|| pushdown_would_violate_requirements(parent_required, plan.as_ref())
{
// If the current plan is a leaf node or can not maintain any of the input ordering, can not pushed down requirements.
// For RepartitionExec, we always choose to not push down the sort requirements even the RepartitionExec(input_partition=1) could maintain input ordering.
// Pushing down is not beneficial
Ok(None)
} else if is_sort_preserving_merge(plan) {
let new_ordering = LexOrdering::from(parent_required.clone());
let mut spm_eqs = plan.equivalence_properties().clone();
// Sort preserving merge will have new ordering, one requirement above is pushed down to its below.
spm_eqs = spm_eqs.with_reorder(new_ordering);
// Do not push-down through SortPreservingMergeExec when
// ordering requirement invalidates requirement of sort preserving merge exec.
if !spm_eqs.ordering_satisfy(&plan.output_ordering().cloned().unwrap_or_default())
{
Ok(None)
} else {
// Can push-down through SortPreservingMergeExec, because parent requirement is finer
// than SortPreservingMergeExec output ordering.
let req = (!parent_required.is_empty())
.then(|| LexRequirement::new(parent_required.to_vec()));
Ok(Some(vec![req]))
}
} else if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
handle_hash_join(hash_join, parent_required)
} else {
handle_custom_pushdown(plan, parent_required, maintains_input_order)
}
// TODO: Add support for Projection push down
}