in datafusion/optimizer/src/optimize_projections/mod.rs [112:424]
fn optimize_projections(
plan: LogicalPlan,
config: &dyn OptimizerConfig,
indices: RequiredIndices,
) -> Result<Transformed<LogicalPlan>> {
// Recursively rewrite any nodes that may be able to avoid computation given
// their parents' required indices.
match plan {
LogicalPlan::Projection(proj) => {
return merge_consecutive_projections(proj)?.transform_data(|proj| {
rewrite_projection_given_requirements(proj, config, &indices)
})
}
LogicalPlan::Aggregate(aggregate) => {
// Split parent requirements to GROUP BY and aggregate sections:
let n_group_exprs = aggregate.group_expr_len()?;
// Offset aggregate indices so that they point to valid indices at
// `aggregate.aggr_expr`:
let (group_by_reqs, aggregate_reqs) = indices.split_off(n_group_exprs);
// Get absolutely necessary GROUP BY fields:
let group_by_expr_existing = aggregate
.group_expr
.iter()
.map(|group_by_expr| group_by_expr.schema_name().to_string())
.collect::<Vec<_>>();
let new_group_bys = if let Some(simplest_groupby_indices) =
get_required_group_by_exprs_indices(
aggregate.input.schema(),
&group_by_expr_existing,
) {
// Some of the fields in the GROUP BY may be required by the
// parent even if these fields are unnecessary in terms of
// functional dependency.
group_by_reqs
.append(&simplest_groupby_indices)
.get_at_indices(&aggregate.group_expr)
} else {
aggregate.group_expr
};
// Only use the absolutely necessary aggregate expressions required
// by the parent:
let mut new_aggr_expr = aggregate_reqs.get_at_indices(&aggregate.aggr_expr);
// Aggregations always need at least one aggregate expression.
// With a nested count, we don't require any column as input, but
// still need to create a correct aggregate, which may be optimized
// out later. As an example, consider the following query:
//
// SELECT count(*) FROM (SELECT count(*) FROM [...])
//
// which always returns 1.
if new_aggr_expr.is_empty()
&& new_group_bys.is_empty()
&& !aggregate.aggr_expr.is_empty()
{
// take the old, first aggregate expression
new_aggr_expr = aggregate.aggr_expr;
new_aggr_expr.resize_with(1, || unreachable!());
}
let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter());
let schema = aggregate.input.schema();
let necessary_indices =
RequiredIndices::new().with_exprs(schema, all_exprs_iter);
let necessary_exprs = necessary_indices.get_required_exprs(schema);
return optimize_projections(
Arc::unwrap_or_clone(aggregate.input),
config,
necessary_indices,
)?
.transform_data(|aggregate_input| {
// Simplify the input of the aggregation by adding a projection so
// that its input only contains absolutely necessary columns for
// the aggregate expressions. Note that necessary_indices refer to
// fields in `aggregate.input.schema()`.
add_projection_on_top_if_helpful(aggregate_input, necessary_exprs)
})?
.map_data(|aggregate_input| {
// Create a new aggregate plan with the updated input and only the
// absolutely necessary fields:
Aggregate::try_new(
Arc::new(aggregate_input),
new_group_bys,
new_aggr_expr,
)
.map(LogicalPlan::Aggregate)
});
}
LogicalPlan::Window(window) => {
let input_schema = Arc::clone(window.input.schema());
// Split parent requirements to child and window expression sections:
let n_input_fields = input_schema.fields().len();
// Offset window expression indices so that they point to valid
// indices at `window.window_expr`:
let (child_reqs, window_reqs) = indices.split_off(n_input_fields);
// Only use window expressions that are absolutely necessary according
// to parent requirements:
let new_window_expr = window_reqs.get_at_indices(&window.window_expr);
// Get all the required column indices at the input, either by the
// parent or window expression requirements.
let required_indices = child_reqs.with_exprs(&input_schema, &new_window_expr);
return optimize_projections(
Arc::unwrap_or_clone(window.input),
config,
required_indices.clone(),
)?
.transform_data(|window_child| {
if new_window_expr.is_empty() {
// When no window expression is necessary, use the input directly:
Ok(Transformed::no(window_child))
} else {
// Calculate required expressions at the input of the window.
// Please note that we use `input_schema`, because `required_indices`
// refers to that schema
let required_exprs =
required_indices.get_required_exprs(&input_schema);
let window_child =
add_projection_on_top_if_helpful(window_child, required_exprs)?
.data;
Window::try_new(new_window_expr, Arc::new(window_child))
.map(LogicalPlan::Window)
.map(Transformed::yes)
}
});
}
LogicalPlan::TableScan(table_scan) => {
let TableScan {
table_name,
source,
projection,
filters,
fetch,
projected_schema: _,
} = table_scan;
// Get indices referred to in the original (schema with all fields)
// given projected indices.
let projection = match &projection {
Some(projection) => indices.into_mapped_indices(|idx| projection[idx]),
None => indices.into_inner(),
};
return TableScan::try_new(
table_name,
source,
Some(projection),
filters,
fetch,
)
.map(LogicalPlan::TableScan)
.map(Transformed::yes);
}
// Other node types are handled below
_ => {}
};
// For other plan node types, calculate indices for columns they use and
// try to rewrite their children
let mut child_required_indices: Vec<RequiredIndices> = match &plan {
LogicalPlan::Sort(_)
| LogicalPlan::Filter(_)
| LogicalPlan::Repartition(_)
| LogicalPlan::Union(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Distinct(Distinct::On(_)) => {
// Pass index requirements from the parent as well as column indices
// that appear in this plan's expressions to its child. All these
// operators benefit from "small" inputs, so the projection_beneficial
// flag is `true`.
plan.inputs()
.into_iter()
.map(|input| {
indices
.clone()
.with_projection_beneficial()
.with_plan_exprs(&plan, input.schema())
})
.collect::<Result<_>>()?
}
LogicalPlan::Limit(_) => {
// Pass index requirements from the parent as well as column indices
// that appear in this plan's expressions to its child. These operators
// do not benefit from "small" inputs, so the projection_beneficial
// flag is `false`.
plan.inputs()
.into_iter()
.map(|input| indices.clone().with_plan_exprs(&plan, input.schema()))
.collect::<Result<_>>()?
}
LogicalPlan::Copy(_)
| LogicalPlan::Ddl(_)
| LogicalPlan::Dml(_)
| LogicalPlan::Explain(_)
| LogicalPlan::Analyze(_)
| LogicalPlan::Subquery(_)
| LogicalPlan::Statement(_)
| LogicalPlan::Distinct(Distinct::All(_)) => {
// These plans require all their fields, and their children should
// be treated as final plans -- otherwise, we may have schema a
// mismatch.
// TODO: For some subquery variants (e.g. a subquery arising from an
// EXISTS expression), we may not need to require all indices.
plan.inputs()
.into_iter()
.map(RequiredIndices::new_for_all_exprs)
.collect()
}
LogicalPlan::Extension(extension) => {
let Some(necessary_children_indices) =
extension.node.necessary_children_exprs(indices.indices())
else {
// Requirements from parent cannot be routed down to user defined logical plan safely
return Ok(Transformed::no(plan));
};
let children = extension.node.inputs();
if children.len() != necessary_children_indices.len() {
return internal_err!("Inconsistent length between children and necessary children indices. \
Make sure `.necessary_children_exprs` implementation of the `UserDefinedLogicalNode` is \
consistent with actual children length for the node.");
}
children
.into_iter()
.zip(necessary_children_indices)
.map(|(child, necessary_indices)| {
RequiredIndices::new_from_indices(necessary_indices)
.with_plan_exprs(&plan, child.schema())
})
.collect::<Result<Vec<_>>>()?
}
LogicalPlan::EmptyRelation(_)
| LogicalPlan::RecursiveQuery(_)
| LogicalPlan::Values(_)
| LogicalPlan::DescribeTable(_) => {
// These operators have no inputs, so stop the optimization process.
return Ok(Transformed::no(plan));
}
LogicalPlan::Join(join) => {
let left_len = join.left.schema().fields().len();
let (left_req_indices, right_req_indices) =
split_join_requirements(left_len, indices, &join.join_type);
let left_indices =
left_req_indices.with_plan_exprs(&plan, join.left.schema())?;
let right_indices =
right_req_indices.with_plan_exprs(&plan, join.right.schema())?;
// Joins benefit from "small" input tables (lower memory usage).
// Therefore, each child benefits from projection:
vec![
left_indices.with_projection_beneficial(),
right_indices.with_projection_beneficial(),
]
}
// these nodes are explicitly rewritten in the match statement above
LogicalPlan::Projection(_)
| LogicalPlan::Aggregate(_)
| LogicalPlan::Window(_)
| LogicalPlan::TableScan(_) => {
return internal_err!(
"OptimizeProjection: should have handled in the match statement above"
);
}
LogicalPlan::Unnest(Unnest {
dependency_indices, ..
}) => {
vec![RequiredIndices::new_from_indices(
dependency_indices.clone(),
)]
}
};
// Required indices are currently ordered (child0, child1, ...)
// but the loop pops off the last element, so we need to reverse the order
child_required_indices.reverse();
if child_required_indices.len() != plan.inputs().len() {
return internal_err!(
"OptimizeProjection: child_required_indices length mismatch with plan inputs"
);
}
// Rewrite children of the plan
let transformed_plan = plan.map_children(|child| {
let required_indices = child_required_indices.pop().ok_or_else(|| {
internal_datafusion_err!(
"Unexpected number of required_indices in OptimizeProjections rule"
)
})?;
let projection_beneficial = required_indices.projection_beneficial();
let project_exprs = required_indices.get_required_exprs(child.schema());
optimize_projections(child, config, required_indices)?.transform_data(
|new_input| {
if projection_beneficial {
add_projection_on_top_if_helpful(new_input, project_exprs)
} else {
Ok(Transformed::no(new_input))
}
},
)
})?;
// If any of the children are transformed, we need to potentially update the plan's schema
if transformed_plan.transformed {
transformed_plan.map_data(|plan| plan.recompute_schema())
} else {
Ok(transformed_plan)
}
}