fn create_initial_plan()

in datafusion/core/src/physical_planner.rs [525:1261]


    fn create_initial_plan<'a>(
        &'a self,
        logical_plan: &'a LogicalPlan,
        session_state: &'a SessionState,
    ) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
        async move {
            let exec_plan: Result<Arc<dyn ExecutionPlan>> = match logical_plan {
                LogicalPlan::TableScan(TableScan {
                    source,
                    projection,
                    filters,
                    fetch,
                    ..
                }) => {
                    let source = source_as_provider(source)?;
                    // Remove all qualifiers from the scan as the provider
                    // doesn't know (nor should care) how the relation was
                    // referred to in the query
                    let filters = unnormalize_cols(filters.iter().cloned());
                    let unaliased: Vec<Expr> = filters.into_iter().map(unalias).collect();
                    source.scan(session_state, projection.as_ref(), &unaliased, *fetch).await
                }
                LogicalPlan::Dml(DmlStatement {
                    table_name,
                    op: WriteOp::InsertInto,
                    input,
                    ..
                }) => {
                    let name = table_name.table();
                    let schema = session_state.schema_for_ref(table_name)?;
                    if let Some(provider) = schema.table(name).await {
                        let input_exec = self.create_initial_plan(input, session_state).await?;
                        provider.insert_into(session_state, input_exec, false).await
                    } else {
                        return Err(DataFusionError::Execution(format!(
                            "Table '{table_name}' does not exist"
                        )));
                    }
                }
                LogicalPlan::Dml(DmlStatement {
                    table_name,
                    op: WriteOp::InsertOverwrite,
                    input,
                    ..
                }) => {
                    let name = table_name.table();
                    let schema = session_state.schema_for_ref(table_name)?;
                    if let Some(provider) = schema.table(name).await {
                        let input_exec = self.create_initial_plan(input, session_state).await?;
                        provider.insert_into(session_state, input_exec, true).await
                    } else {
                        return Err(DataFusionError::Execution(format!(
                            "Table '{table_name}' does not exist"
                        )));
                    }
                }
                LogicalPlan::Values(Values {
                    values,
                    schema,
                }) => {
                    let exec_schema = schema.as_ref().to_owned().into();
                    let exprs = values.iter()
                        .map(|row| {
                            row.iter().map(|expr| {
                                self.create_physical_expr(
                                    expr,
                                    schema,
                                    &exec_schema,
                                    session_state,
                                )
                            })
                            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()
                        })
                        .collect::<Result<Vec<_>>>()?;
                    let value_exec = ValuesExec::try_new(
                        SchemaRef::new(exec_schema),
                        exprs,
                    )?;
                    Ok(Arc::new(value_exec))
                }
                LogicalPlan::Window(Window {
                    input, window_expr, ..
                }) => {
                    if window_expr.is_empty() {
                        return Err(DataFusionError::Internal(
                            "Impossibly got empty window expression".to_owned(),
                        ));
                    }

                    let input_exec = self.create_initial_plan(input, session_state).await?;

                    // at this moment we are guaranteed by the logical planner
                    // to have all the window_expr to have equal sort key
                    let partition_keys = window_expr_common_partition_keys(window_expr)?;

                    let can_repartition = !partition_keys.is_empty()
                        && session_state.config().target_partitions() > 1
                        && session_state.config().repartition_window_functions();

                    let physical_partition_keys = if can_repartition
                    {
                        partition_keys
                            .iter()
                            .map(|e| {
                                self.create_physical_expr(
                                    e,
                                    input.schema(),
                                    &input_exec.schema(),
                                    session_state,
                                )
                            })
                            .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?
                    } else {
                        vec![]
                    };

                    let get_sort_keys = |expr: &Expr| match expr {
                        Expr::WindowFunction(WindowFunction{
                            ref partition_by,
                            ref order_by,
                            ..
                        }) => generate_sort_key(partition_by, order_by),
                        Expr::Alias(Alias{expr,..}) => {
                            // Convert &Box<T> to &T
                            match &**expr {
                                Expr::WindowFunction(WindowFunction{
                                    ref partition_by,
                                    ref order_by,
                                    ..}) => generate_sort_key(partition_by, order_by),
                                _ => unreachable!(),
                            }
                        }
                        _ => unreachable!(),
                    };
                    let sort_keys = get_sort_keys(&window_expr[0])?;
                    if window_expr.len() > 1 {
                        debug_assert!(
                            window_expr[1..]
                                .iter()
                                .all(|expr| get_sort_keys(expr).unwrap() == sort_keys),
                            "all window expressions shall have the same sort keys, as guaranteed by logical planning"
                        );
                    }

                    let logical_input_schema = input.schema();
                    let physical_input_schema = input_exec.schema();
                    let window_expr = window_expr
                        .iter()
                        .map(|e| {
                            create_window_expr(
                                e,
                                logical_input_schema,
                                &physical_input_schema,
                                session_state.execution_props(),
                            )
                        })
                        .collect::<Result<Vec<_>>>()?;

                    let uses_bounded_memory = window_expr
                        .iter()
                        .all(|e| e.uses_bounded_memory());
                    // If all window expressions can run with bounded memory,
                    // choose the bounded window variant:
                    Ok(if uses_bounded_memory {
                        Arc::new(BoundedWindowAggExec::try_new(
                            window_expr,
                            input_exec,
                            physical_input_schema,
                            physical_partition_keys,
                            PartitionSearchMode::Sorted,
                        )?)
                    } else {
                        Arc::new(WindowAggExec::try_new(
                            window_expr,
                            input_exec,
                            physical_input_schema,
                            physical_partition_keys,
                        )?)
                    })
                }
                LogicalPlan::Aggregate(Aggregate {
                    input,
                    group_expr,
                    aggr_expr,
                    ..
                }) => {
                    // Initially need to perform the aggregate and then merge the partitions
                    let input_exec = self.create_initial_plan(input, session_state).await?;
                    let physical_input_schema = input_exec.schema();
                    let logical_input_schema = input.as_ref().schema();

                    let groups = self.create_grouping_physical_expr(
                        group_expr,
                        logical_input_schema,
                        &physical_input_schema,
                        session_state)?;

                    let agg_filter = aggr_expr
                        .iter()
                        .map(|e| {
                            create_aggregate_expr_and_maybe_filter(
                                e,
                                logical_input_schema,
                                &physical_input_schema,
                                session_state.execution_props(),
                            )
                        })
                        .collect::<Result<Vec<_>>>()?;

                    let (aggregates, filters, order_bys) : (Vec<_>, Vec<_>, Vec<_>) = multiunzip(agg_filter.into_iter());

                    let initial_aggr = Arc::new(AggregateExec::try_new(
                        AggregateMode::Partial,
                        groups.clone(),
                        aggregates.clone(),
                        filters.clone(),
                        order_bys,
                        input_exec,
                        physical_input_schema.clone(),
                    )?);

                    // update group column indices based on partial aggregate plan evaluation
                    let final_group: Vec<Arc<dyn PhysicalExpr>> = initial_aggr.output_group_expr();

                    let can_repartition = !groups.is_empty()
                        && session_state.config().target_partitions() > 1
                        && session_state.config().repartition_aggregations();

                    // Some aggregators may be modified during initialization for
                    // optimization purposes. For example, a FIRST_VALUE may turn
                    // into a LAST_VALUE with the reverse ordering requirement.
                    // To reflect such changes to subsequent stages, use the updated
                    // `AggregateExpr`/`PhysicalSortExpr` objects.
                    let updated_aggregates = initial_aggr.aggr_expr.clone();
                    let updated_order_bys = initial_aggr.order_by_expr.clone();

                    let (initial_aggr, next_partition_mode): (
                        Arc<dyn ExecutionPlan>,
                        AggregateMode,
                    ) = if can_repartition {
                        // construct a second aggregation with 'AggregateMode::FinalPartitioned'
                        (initial_aggr, AggregateMode::FinalPartitioned)
                    } else {
                        // construct a second aggregation, keeping the final column name equal to the
                        // first aggregation and the expressions corresponding to the respective aggregate
                        (initial_aggr, AggregateMode::Final)
                    };

                    let final_grouping_set = PhysicalGroupBy::new_single(
                        final_group
                            .iter()
                            .enumerate()
                            .map(|(i, expr)| (expr.clone(), groups.expr()[i].1.clone()))
                            .collect()
                    );

                    Ok(Arc::new(AggregateExec::try_new(
                        next_partition_mode,
                        final_grouping_set,
                        updated_aggregates,
                        filters,
                        updated_order_bys,
                        initial_aggr,
                        physical_input_schema.clone(),
                    )?))
                }
                LogicalPlan::Projection(Projection { input, expr, .. }) => {
                    let input_exec = self.create_initial_plan(input, session_state).await?;
                    let input_schema = input.as_ref().schema();

                    let physical_exprs = expr
                        .iter()
                        .map(|e| {
                            // For projections, SQL planner and logical plan builder may convert user
                            // provided expressions into logical Column expressions if their results
                            // are already provided from the input plans. Because we work with
                            // qualified columns in logical plane, derived columns involve operators or
                            // functions will contain qualifiers as well. This will result in logical
                            // columns with names like `SUM(t1.c1)`, `t1.c1 + t1.c2`, etc.
                            //
                            // If we run these logical columns through physical_name function, we will
                            // get physical names with column qualifiers, which violates DataFusion's
                            // field name semantics. To account for this, we need to derive the
                            // physical name from physical input instead.
                            //
                            // This depends on the invariant that logical schema field index MUST match
                            // with physical schema field index.
                            let physical_name = if let Expr::Column(col) = e {
                                match input_schema.index_of_column(col) {
                                    Ok(idx) => {
                                        // index physical field using logical field index
                                        Ok(input_exec.schema().field(idx).name().to_string())
                                    }
                                    // logical column is not a derived column, safe to pass along to
                                    // physical_name
                                    Err(_) => physical_name(e),
                                }
                            } else {
                                physical_name(e)
                            };

                            tuple_err((
                                self.create_physical_expr(
                                    e,
                                    input_schema,
                                    &input_exec.schema(),
                                    session_state,
                                ),
                                physical_name,
                            ))
                        })
                        .collect::<Result<Vec<_>>>()?;

                    Ok(Arc::new(ProjectionExec::try_new(
                        physical_exprs,
                        input_exec,
                    )?))
                }
                LogicalPlan::Filter(filter) => {
                    let physical_input = self.create_initial_plan(&filter.input, session_state).await?;
                    let input_schema = physical_input.as_ref().schema();
                    let input_dfschema = filter.input.schema();

                    let runtime_expr = self.create_physical_expr(
                        &filter.predicate,
                        input_dfschema,
                        &input_schema,
                        session_state,
                    )?;
                    Ok(Arc::new(FilterExec::try_new(runtime_expr, physical_input)?))
                }
                LogicalPlan::Union(Union { inputs, schema }) => {
                    let physical_plans = self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), session_state).await?;

                    if schema.fields().len() < physical_plans[0].schema().fields().len() {
                        // `schema` could be a subset of the child schema. For example
                        // for query "select count(*) from (select a from t union all select a from t)"
                        // `schema` is empty but child schema contains one field `a`.
                        Ok(Arc::new(UnionExec::try_new_with_schema(physical_plans, schema.clone())?))
                    } else {
                        Ok(Arc::new(UnionExec::new(physical_plans)))
                    }
                }
                LogicalPlan::Repartition(Repartition {
                    input,
                    partitioning_scheme,
                }) => {
                    let physical_input = self.create_initial_plan(input, session_state).await?;
                    let input_schema = physical_input.schema();
                    let input_dfschema = input.as_ref().schema();
                    let physical_partitioning = match partitioning_scheme {
                        LogicalPartitioning::RoundRobinBatch(n) => {
                            Partitioning::RoundRobinBatch(*n)
                        }
                        LogicalPartitioning::Hash(expr, n) => {
                            let runtime_expr = expr
                                .iter()
                                .map(|e| {
                                    self.create_physical_expr(
                                        e,
                                        input_dfschema,
                                        &input_schema,
                                        session_state,
                                    )
                                })
                                .collect::<Result<Vec<_>>>()?;
                            Partitioning::Hash(runtime_expr, *n)
                        }
                        LogicalPartitioning::DistributeBy(_) => {
                            return Err(DataFusionError::NotImplemented("Physical plan does not support DistributeBy partitioning".to_string()));
                        }
                    };
                    Ok(Arc::new(RepartitionExec::try_new(
                        physical_input,
                        physical_partitioning,
                    )?))
                }
                LogicalPlan::Sort(Sort { expr, input, fetch, .. }) => {
                    let physical_input = self.create_initial_plan(input, session_state).await?;
                    let input_schema = physical_input.as_ref().schema();
                    let input_dfschema = input.as_ref().schema();
                    let sort_expr = expr
                        .iter()
                        .map(|e| create_physical_sort_expr(
                            e,
                            input_dfschema,
                            &input_schema,
                            session_state.execution_props(),
                        ))
                        .collect::<Result<Vec<_>>>()?;
                    let new_sort = SortExec::new(sort_expr, physical_input)
                        .with_fetch(*fetch);
                    Ok(Arc::new(new_sort))
                }
                LogicalPlan::Join(Join {
                    left,
                    right,
                    on: keys,
                    filter,
                    join_type,
                    null_equals_null,
                    schema: join_schema,
                    ..
                }) => {
                    let null_equals_null = *null_equals_null;

                    // If join has expression equijoin keys, add physical projecton.
                    let has_expr_join_key = keys.iter().any(|(l, r)| {
                        !(matches!(l, Expr::Column(_))
                            && matches!(r, Expr::Column(_)))
                    });
                    if has_expr_join_key {
                        let left_keys = keys
                            .iter()
                            .map(|(l, _r)| l)
                            .cloned()
                            .collect::<Vec<_>>();
                        let right_keys = keys
                            .iter()
                            .map(|(_l, r)| r)
                            .cloned()
                            .collect::<Vec<_>>();
                        let (left, right, column_on, added_project) = {
                            let (left, left_col_keys, left_projected) =
                                wrap_projection_for_join_if_necessary(
                                    left_keys.as_slice(),
                                    left.as_ref().clone(),
                                )?;
                            let (right, right_col_keys, right_projected) =
                                wrap_projection_for_join_if_necessary(
                                    &right_keys,
                                    right.as_ref().clone(),
                                )?;
                            (
                                left,
                                right,
                                (left_col_keys, right_col_keys),
                                left_projected || right_projected,
                            )
                        };

                        let join_plan =
                            LogicalPlan::Join(Join::try_new_with_project_input(
                                logical_plan,
                                Arc::new(left),
                                Arc::new(right),
                                column_on,
                            )?);

                        // Remove temporary projected columns
                        let join_plan = if added_project {
                            let final_join_result = join_schema
                                .fields()
                                .iter()
                                .map(|field| {
                                    Expr::Column(field.qualified_column())
                                })
                                .collect::<Vec<_>>();
                            let projection =
                                Projection::try_new(
                                    final_join_result,
                                    Arc::new(join_plan),
                                )?;
                            LogicalPlan::Projection(projection)
                        } else {
                            join_plan
                        };

                        return self
                            .create_initial_plan(&join_plan, session_state)
                            .await;
                    }

                    // All equi-join keys are columns now, create physical join plan
                    let left_right = self.create_initial_plan_multi([left.as_ref(), right.as_ref()], session_state).await?;
                    let [physical_left, physical_right]: [Arc<dyn ExecutionPlan>; 2] = left_right.try_into().map_err(|_| DataFusionError::Internal("`create_initial_plan_multi` is broken".to_string()))?;
                    let left_df_schema = left.schema();
                    let right_df_schema = right.schema();
                    let join_on = keys
                        .iter()
                        .map(|(l, r)| {
                            let l = l.try_into_col()?;
                            let r = r.try_into_col()?;
                            Ok((
                                Column::new(&l.name, left_df_schema.index_of_column(&l)?),
                                Column::new(&r.name, right_df_schema.index_of_column(&r)?),
                            ))
                        })
                        .collect::<Result<join_utils::JoinOn>>()?;

                    let join_filter = match filter {
                        Some(expr) => {
                            // Extract columns from filter expression and saved in a HashSet
                            let cols = expr.to_columns()?;

                            // Collect left & right field indices, the field indices are sorted in ascending order
                            let left_field_indices = cols.iter()
                                .filter_map(|c| match left_df_schema.index_of_column(c) {
                                    Ok(idx) => Some(idx),
                                    _ => None,
                                }).sorted()
                                .collect::<Vec<_>>();
                            let right_field_indices = cols.iter()
                                .filter_map(|c| match right_df_schema.index_of_column(c) {
                                    Ok(idx) => Some(idx),
                                    _ => None,
                                }).sorted()
                                .collect::<Vec<_>>();

                            // Collect DFFields and Fields required for intermediate schemas
                            let (filter_df_fields, filter_fields): (Vec<_>, Vec<_>) = left_field_indices.clone()
                                .into_iter()
                                .map(|i| (
                                    left_df_schema.field(i).clone(),
                                    physical_left.schema().field(i).clone(),
                                ))
                                .chain(
                                    right_field_indices.clone()
                                        .into_iter()
                                        .map(|i| (
                                            right_df_schema.field(i).clone(),
                                            physical_right.schema().field(i).clone(),
                                        ))
                                )
                                .unzip();

                            // Construct intermediate schemas used for filtering data and
                            // convert logical expression to physical according to filter schema
                            let filter_df_schema = DFSchema::new_with_metadata(filter_df_fields, HashMap::new())?;
                            let filter_schema = Schema::new_with_metadata(filter_fields, HashMap::new());
                            let filter_expr = create_physical_expr(
                                expr,
                                &filter_df_schema,
                                &filter_schema,
                                session_state.execution_props(),
                            )?;
                            let column_indices = join_utils::JoinFilter::build_column_indices(left_field_indices, right_field_indices);

                            Some(join_utils::JoinFilter::new(
                                filter_expr,
                                column_indices,
                                filter_schema,
                            ))
                        }
                        _ => None
                    };

                    let prefer_hash_join = session_state.config_options().optimizer.prefer_hash_join;
                    if join_on.is_empty() {
                        // there is no equal join condition, use the nested loop join
                        // TODO optimize the plan, and use the config of `target_partitions` and `repartition_joins`
                        Ok(Arc::new(NestedLoopJoinExec::try_new(
                            physical_left,
                            physical_right,
                            join_filter,
                            join_type,
                        )?))
                    } else if session_state.config().target_partitions() > 1
                        && session_state.config().repartition_joins()
                        && !prefer_hash_join
                    {
                        // Use SortMergeJoin if hash join is not preferred
                        // Sort-Merge join support currently is experimental
                        if join_filter.is_some() {
                            // TODO SortMergeJoinExec need to support join filter
                            Err(DataFusionError::NotImplemented("SortMergeJoinExec does not support join_filter now.".to_string()))
                        } else {
                            let join_on_len = join_on.len();
                            Ok(Arc::new(SortMergeJoinExec::try_new(
                                physical_left,
                                physical_right,
                                join_on,
                                *join_type,
                                vec![SortOptions::default(); join_on_len],
                                null_equals_null,
                            )?))
                        }
                    } else if session_state.config().target_partitions() > 1
                        && session_state.config().repartition_joins()
                        && prefer_hash_join {
                         let partition_mode = {
                            if session_state.config().collect_statistics() {
                                PartitionMode::Auto
                            } else {
                                PartitionMode::Partitioned
                            }
                         };
                        Ok(Arc::new(HashJoinExec::try_new(
                            physical_left,
                            physical_right,
                            join_on,
                            join_filter,
                            join_type,
                            partition_mode,
                            null_equals_null,
                        )?))
                    } else {
                        Ok(Arc::new(HashJoinExec::try_new(
                            physical_left,
                            physical_right,
                            join_on,
                            join_filter,
                            join_type,
                            PartitionMode::CollectLeft,
                            null_equals_null,
                        )?))
                    }
                }
                LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
                    let left_right = self.create_initial_plan_multi([left.as_ref(), right.as_ref()], session_state).await?;
                    let [left, right]: [Arc<dyn ExecutionPlan>; 2] = left_right.try_into().map_err(|_| DataFusionError::Internal("`create_initial_plan_multi` is broken".to_string()))?;
                    Ok(Arc::new(CrossJoinExec::new(left, right)))
                }
                LogicalPlan::Subquery(_) => todo!(),
                LogicalPlan::EmptyRelation(EmptyRelation {
                    produce_one_row,
                    schema,
                }) => Ok(Arc::new(EmptyExec::new(
                    *produce_one_row,
                    SchemaRef::new(schema.as_ref().to_owned().into()),
                ))),
                LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
                    self.create_initial_plan(input, session_state).await
                }
                LogicalPlan::Limit(Limit { input, skip, fetch, .. }) => {
                    let input = self.create_initial_plan(input, session_state).await?;

                    // GlobalLimitExec requires a single partition for input
                    let input = if input.output_partitioning().partition_count() == 1 {
                        input
                    } else {
                        // Apply a LocalLimitExec to each partition. The optimizer will also insert
                        // a CoalescePartitionsExec between the GlobalLimitExec and LocalLimitExec
                        if let Some(fetch) = fetch {
                            Arc::new(LocalLimitExec::new(input, *fetch + skip))
                        } else {
                            input
                        }
                    };

                    Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch)))
                }
                LogicalPlan::Unnest(Unnest { input, column, schema }) => {
                    let input = self.create_initial_plan(input, session_state).await?;
                    let column_exec = schema.index_of_column(column)
                        .map(|idx| Column::new(&column.name, idx))?;
                    let schema = SchemaRef::new(schema.as_ref().to_owned().into());
                    Ok(Arc::new(UnnestExec::new(input, column_exec, schema)))
                }
                LogicalPlan::Ddl(ddl) => {
                    // There is no default plan for DDl statements --
                    // it must be handled at a higher level (so that
                    // the appropriate table can be registered with
                    // the context)
                    let name = ddl.name();
                    Err(DataFusionError::NotImplemented(
                        format!("Unsupported logical plan: {name}")
                    ))
                }
                LogicalPlan::Prepare(_) => {
                    // There is no default plan for "PREPARE" -- it must be
                    // handled at a higher level (so that the appropriate
                    // statement can be prepared)
                    Err(DataFusionError::NotImplemented(
                        "Unsupported logical plan: Prepare".to_string(),
                    ))
                }
                LogicalPlan::Dml(_) => {
                    // DataFusion is a read-only query engine, but also a library, so consumers may implement this
                    Err(DataFusionError::NotImplemented(
                        "Unsupported logical plan: Dml".to_string(),
                    ))
                }
                LogicalPlan::Statement(statement) => {
                    // DataFusion is a read-only query engine, but also a library, so consumers may implement this
                    let name = statement.name();
                    Err(DataFusionError::NotImplemented(
                        format!("Unsupported logical plan: Statement({name})")
                    ))
                }
                LogicalPlan::DescribeTable(_) => {
                    Err(DataFusionError::Internal(
                        "Unsupported logical plan: DescribeTable must be root of the plan".to_string(),
                    ))
                }
                LogicalPlan::Explain(_) => Err(DataFusionError::Internal(
                    "Unsupported logical plan: Explain must be root of the plan".to_string(),
                )),
                LogicalPlan::Distinct(_) => {
                    Err(DataFusionError::Internal(
                        "Unsupported logical plan: Distinct should be replaced to Aggregate".to_string(),
                    ))
                }
                LogicalPlan::Analyze(_) => Err(DataFusionError::Internal(
                    "Unsupported logical plan: Analyze must be root of the plan".to_string(),
                )),
                LogicalPlan::Extension(e) => {
                    let physical_inputs = self.create_initial_plan_multi(e.node.inputs(), session_state).await?;

                    let mut maybe_plan = None;
                    for planner in &self.extension_planners {
                        if maybe_plan.is_some() {
                            break;
                        }

                        let logical_input = e.node.inputs();
                        maybe_plan = planner.plan_extension(
                            self,
                            e.node.as_ref(),
                            &logical_input,
                            &physical_inputs,
                            session_state,
                        ).await?;
                    }

                    let plan = match maybe_plan {
                        Some(v) => Ok(v),
                        _ => plan_err!("No installed planner was able to convert the custom node to an execution plan: {:?}", e.node)
                    }?;

                    // Ensure the ExecutionPlan's schema matches the
                    // declared logical schema to catch and warn about
                    // logic errors when creating user defined plans.
                    if !e.node.schema().matches_arrow_schema(&plan.schema()) {
                        plan_err!(
                            "Extension planner for {:?} created an ExecutionPlan with mismatched schema. \
                            LogicalPlan schema: {:?}, ExecutionPlan schema: {:?}",
                            e.node, e.node.schema(), plan.schema()
                        )
                    } else {
                        Ok(plan)
                    }
                }
            };
            exec_plan
        }.boxed()
    }