fn optimize_projections()

in datafusion/optimizer/src/optimize_projections/mod.rs [112:424]


fn optimize_projections(
    plan: LogicalPlan,
    config: &dyn OptimizerConfig,
    indices: RequiredIndices,
) -> Result<Transformed<LogicalPlan>> {
    // Recursively rewrite any nodes that may be able to avoid computation given
    // their parents' required indices.
    match plan {
        LogicalPlan::Projection(proj) => {
            return merge_consecutive_projections(proj)?.transform_data(|proj| {
                rewrite_projection_given_requirements(proj, config, &indices)
            })
        }
        LogicalPlan::Aggregate(aggregate) => {
            // Split parent requirements to GROUP BY and aggregate sections:
            let n_group_exprs = aggregate.group_expr_len()?;
            // Offset aggregate indices so that they point to valid indices at
            // `aggregate.aggr_expr`:
            let (group_by_reqs, aggregate_reqs) = indices.split_off(n_group_exprs);

            // Get absolutely necessary GROUP BY fields:
            let group_by_expr_existing = aggregate
                .group_expr
                .iter()
                .map(|group_by_expr| group_by_expr.schema_name().to_string())
                .collect::<Vec<_>>();

            let new_group_bys = if let Some(simplest_groupby_indices) =
                get_required_group_by_exprs_indices(
                    aggregate.input.schema(),
                    &group_by_expr_existing,
                ) {
                // Some of the fields in the GROUP BY may be required by the
                // parent even if these fields are unnecessary in terms of
                // functional dependency.
                group_by_reqs
                    .append(&simplest_groupby_indices)
                    .get_at_indices(&aggregate.group_expr)
            } else {
                aggregate.group_expr
            };

            // Only use the absolutely necessary aggregate expressions required
            // by the parent:
            let mut new_aggr_expr = aggregate_reqs.get_at_indices(&aggregate.aggr_expr);

            // Aggregations always need at least one aggregate expression.
            // With a nested count, we don't require any column as input, but
            // still need to create a correct aggregate, which may be optimized
            // out later. As an example, consider the following query:
            //
            // SELECT count(*) FROM (SELECT count(*) FROM [...])
            //
            // which always returns 1.
            if new_aggr_expr.is_empty()
                && new_group_bys.is_empty()
                && !aggregate.aggr_expr.is_empty()
            {
                // take the old, first aggregate expression
                new_aggr_expr = aggregate.aggr_expr;
                new_aggr_expr.resize_with(1, || unreachable!());
            }

            let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter());
            let schema = aggregate.input.schema();
            let necessary_indices =
                RequiredIndices::new().with_exprs(schema, all_exprs_iter);
            let necessary_exprs = necessary_indices.get_required_exprs(schema);

            return optimize_projections(
                Arc::unwrap_or_clone(aggregate.input),
                config,
                necessary_indices,
            )?
            .transform_data(|aggregate_input| {
                // Simplify the input of the aggregation by adding a projection so
                // that its input only contains absolutely necessary columns for
                // the aggregate expressions. Note that necessary_indices refer to
                // fields in `aggregate.input.schema()`.
                add_projection_on_top_if_helpful(aggregate_input, necessary_exprs)
            })?
            .map_data(|aggregate_input| {
                // Create a new aggregate plan with the updated input and only the
                // absolutely necessary fields:
                Aggregate::try_new(
                    Arc::new(aggregate_input),
                    new_group_bys,
                    new_aggr_expr,
                )
                .map(LogicalPlan::Aggregate)
            });
        }
        LogicalPlan::Window(window) => {
            let input_schema = Arc::clone(window.input.schema());
            // Split parent requirements to child and window expression sections:
            let n_input_fields = input_schema.fields().len();
            // Offset window expression indices so that they point to valid
            // indices at `window.window_expr`:
            let (child_reqs, window_reqs) = indices.split_off(n_input_fields);

            // Only use window expressions that are absolutely necessary according
            // to parent requirements:
            let new_window_expr = window_reqs.get_at_indices(&window.window_expr);

            // Get all the required column indices at the input, either by the
            // parent or window expression requirements.
            let required_indices = child_reqs.with_exprs(&input_schema, &new_window_expr);

            return optimize_projections(
                Arc::unwrap_or_clone(window.input),
                config,
                required_indices.clone(),
            )?
            .transform_data(|window_child| {
                if new_window_expr.is_empty() {
                    // When no window expression is necessary, use the input directly:
                    Ok(Transformed::no(window_child))
                } else {
                    // Calculate required expressions at the input of the window.
                    // Please note that we use `input_schema`, because `required_indices`
                    // refers to that schema
                    let required_exprs =
                        required_indices.get_required_exprs(&input_schema);
                    let window_child =
                        add_projection_on_top_if_helpful(window_child, required_exprs)?
                            .data;
                    Window::try_new(new_window_expr, Arc::new(window_child))
                        .map(LogicalPlan::Window)
                        .map(Transformed::yes)
                }
            });
        }
        LogicalPlan::TableScan(table_scan) => {
            let TableScan {
                table_name,
                source,
                projection,
                filters,
                fetch,
                projected_schema: _,
            } = table_scan;

            // Get indices referred to in the original (schema with all fields)
            // given projected indices.
            let projection = match &projection {
                Some(projection) => indices.into_mapped_indices(|idx| projection[idx]),
                None => indices.into_inner(),
            };
            return TableScan::try_new(
                table_name,
                source,
                Some(projection),
                filters,
                fetch,
            )
            .map(LogicalPlan::TableScan)
            .map(Transformed::yes);
        }
        // Other node types are handled below
        _ => {}
    };

    // For other plan node types, calculate indices for columns they use and
    // try to rewrite their children
    let mut child_required_indices: Vec<RequiredIndices> = match &plan {
        LogicalPlan::Sort(_)
        | LogicalPlan::Filter(_)
        | LogicalPlan::Repartition(_)
        | LogicalPlan::Union(_)
        | LogicalPlan::SubqueryAlias(_)
        | LogicalPlan::Distinct(Distinct::On(_)) => {
            // Pass index requirements from the parent as well as column indices
            // that appear in this plan's expressions to its child. All these
            // operators benefit from "small" inputs, so the projection_beneficial
            // flag is `true`.
            plan.inputs()
                .into_iter()
                .map(|input| {
                    indices
                        .clone()
                        .with_projection_beneficial()
                        .with_plan_exprs(&plan, input.schema())
                })
                .collect::<Result<_>>()?
        }
        LogicalPlan::Limit(_) => {
            // Pass index requirements from the parent as well as column indices
            // that appear in this plan's expressions to its child. These operators
            // do not benefit from "small" inputs, so the projection_beneficial
            // flag is `false`.
            plan.inputs()
                .into_iter()
                .map(|input| indices.clone().with_plan_exprs(&plan, input.schema()))
                .collect::<Result<_>>()?
        }
        LogicalPlan::Copy(_)
        | LogicalPlan::Ddl(_)
        | LogicalPlan::Dml(_)
        | LogicalPlan::Explain(_)
        | LogicalPlan::Analyze(_)
        | LogicalPlan::Subquery(_)
        | LogicalPlan::Statement(_)
        | LogicalPlan::Distinct(Distinct::All(_)) => {
            // These plans require all their fields, and their children should
            // be treated as final plans -- otherwise, we may have schema a
            // mismatch.
            // TODO: For some subquery variants (e.g. a subquery arising from an
            //       EXISTS expression), we may not need to require all indices.
            plan.inputs()
                .into_iter()
                .map(RequiredIndices::new_for_all_exprs)
                .collect()
        }
        LogicalPlan::Extension(extension) => {
            let Some(necessary_children_indices) =
                extension.node.necessary_children_exprs(indices.indices())
            else {
                // Requirements from parent cannot be routed down to user defined logical plan safely
                return Ok(Transformed::no(plan));
            };
            let children = extension.node.inputs();
            if children.len() != necessary_children_indices.len() {
                return internal_err!("Inconsistent length between children and necessary children indices. \
                Make sure `.necessary_children_exprs` implementation of the `UserDefinedLogicalNode` is \
                consistent with actual children length for the node.");
            }
            children
                .into_iter()
                .zip(necessary_children_indices)
                .map(|(child, necessary_indices)| {
                    RequiredIndices::new_from_indices(necessary_indices)
                        .with_plan_exprs(&plan, child.schema())
                })
                .collect::<Result<Vec<_>>>()?
        }
        LogicalPlan::EmptyRelation(_)
        | LogicalPlan::RecursiveQuery(_)
        | LogicalPlan::Values(_)
        | LogicalPlan::DescribeTable(_) => {
            // These operators have no inputs, so stop the optimization process.
            return Ok(Transformed::no(plan));
        }
        LogicalPlan::Join(join) => {
            let left_len = join.left.schema().fields().len();
            let (left_req_indices, right_req_indices) =
                split_join_requirements(left_len, indices, &join.join_type);
            let left_indices =
                left_req_indices.with_plan_exprs(&plan, join.left.schema())?;
            let right_indices =
                right_req_indices.with_plan_exprs(&plan, join.right.schema())?;
            // Joins benefit from "small" input tables (lower memory usage).
            // Therefore, each child benefits from projection:
            vec![
                left_indices.with_projection_beneficial(),
                right_indices.with_projection_beneficial(),
            ]
        }
        // these nodes are explicitly rewritten in the match statement above
        LogicalPlan::Projection(_)
        | LogicalPlan::Aggregate(_)
        | LogicalPlan::Window(_)
        | LogicalPlan::TableScan(_) => {
            return internal_err!(
                "OptimizeProjection: should have handled in the match statement above"
            );
        }
        LogicalPlan::Unnest(Unnest {
            dependency_indices, ..
        }) => {
            vec![RequiredIndices::new_from_indices(
                dependency_indices.clone(),
            )]
        }
    };

    // Required indices are currently ordered (child0, child1, ...)
    // but the loop pops off the last element, so we need to reverse the order
    child_required_indices.reverse();
    if child_required_indices.len() != plan.inputs().len() {
        return internal_err!(
            "OptimizeProjection: child_required_indices length mismatch with plan inputs"
        );
    }

    // Rewrite children of the plan
    let transformed_plan = plan.map_children(|child| {
        let required_indices = child_required_indices.pop().ok_or_else(|| {
            internal_datafusion_err!(
                "Unexpected number of required_indices in OptimizeProjections rule"
            )
        })?;

        let projection_beneficial = required_indices.projection_beneficial();
        let project_exprs = required_indices.get_required_exprs(child.schema());

        optimize_projections(child, config, required_indices)?.transform_data(
            |new_input| {
                if projection_beneficial {
                    add_projection_on_top_if_helpful(new_input, project_exprs)
                } else {
                    Ok(Transformed::no(new_input))
                }
            },
        )
    })?;

    // If any of the children are transformed, we need to potentially update the plan's schema
    if transformed_plan.transformed {
        transformed_plan.map_data(|plan| plan.recompute_schema())
    } else {
        Ok(transformed_plan)
    }
}