in datafusion/core/src/physical_planner.rs [525:1261]
fn create_initial_plan<'a>(
&'a self,
logical_plan: &'a LogicalPlan,
session_state: &'a SessionState,
) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
async move {
let exec_plan: Result<Arc<dyn ExecutionPlan>> = match logical_plan {
LogicalPlan::TableScan(TableScan {
source,
projection,
filters,
fetch,
..
}) => {
let source = source_as_provider(source)?;
// Remove all qualifiers from the scan as the provider
// doesn't know (nor should care) how the relation was
// referred to in the query
let filters = unnormalize_cols(filters.iter().cloned());
let unaliased: Vec<Expr> = filters.into_iter().map(unalias).collect();
source.scan(session_state, projection.as_ref(), &unaliased, *fetch).await
}
LogicalPlan::Dml(DmlStatement {
table_name,
op: WriteOp::InsertInto,
input,
..
}) => {
let name = table_name.table();
let schema = session_state.schema_for_ref(table_name)?;
if let Some(provider) = schema.table(name).await {
let input_exec = self.create_initial_plan(input, session_state).await?;
provider.insert_into(session_state, input_exec, false).await
} else {
return Err(DataFusionError::Execution(format!(
"Table '{table_name}' does not exist"
)));
}
}
LogicalPlan::Dml(DmlStatement {
table_name,
op: WriteOp::InsertOverwrite,
input,
..
}) => {
let name = table_name.table();
let schema = session_state.schema_for_ref(table_name)?;
if let Some(provider) = schema.table(name).await {
let input_exec = self.create_initial_plan(input, session_state).await?;
provider.insert_into(session_state, input_exec, true).await
} else {
return Err(DataFusionError::Execution(format!(
"Table '{table_name}' does not exist"
)));
}
}
LogicalPlan::Values(Values {
values,
schema,
}) => {
let exec_schema = schema.as_ref().to_owned().into();
let exprs = values.iter()
.map(|row| {
row.iter().map(|expr| {
self.create_physical_expr(
expr,
schema,
&exec_schema,
session_state,
)
})
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()
})
.collect::<Result<Vec<_>>>()?;
let value_exec = ValuesExec::try_new(
SchemaRef::new(exec_schema),
exprs,
)?;
Ok(Arc::new(value_exec))
}
LogicalPlan::Window(Window {
input, window_expr, ..
}) => {
if window_expr.is_empty() {
return Err(DataFusionError::Internal(
"Impossibly got empty window expression".to_owned(),
));
}
let input_exec = self.create_initial_plan(input, session_state).await?;
// at this moment we are guaranteed by the logical planner
// to have all the window_expr to have equal sort key
let partition_keys = window_expr_common_partition_keys(window_expr)?;
let can_repartition = !partition_keys.is_empty()
&& session_state.config().target_partitions() > 1
&& session_state.config().repartition_window_functions();
let physical_partition_keys = if can_repartition
{
partition_keys
.iter()
.map(|e| {
self.create_physical_expr(
e,
input.schema(),
&input_exec.schema(),
session_state,
)
})
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?
} else {
vec![]
};
let get_sort_keys = |expr: &Expr| match expr {
Expr::WindowFunction(WindowFunction{
ref partition_by,
ref order_by,
..
}) => generate_sort_key(partition_by, order_by),
Expr::Alias(Alias{expr,..}) => {
// Convert &Box<T> to &T
match &**expr {
Expr::WindowFunction(WindowFunction{
ref partition_by,
ref order_by,
..}) => generate_sort_key(partition_by, order_by),
_ => unreachable!(),
}
}
_ => unreachable!(),
};
let sort_keys = get_sort_keys(&window_expr[0])?;
if window_expr.len() > 1 {
debug_assert!(
window_expr[1..]
.iter()
.all(|expr| get_sort_keys(expr).unwrap() == sort_keys),
"all window expressions shall have the same sort keys, as guaranteed by logical planning"
);
}
let logical_input_schema = input.schema();
let physical_input_schema = input_exec.schema();
let window_expr = window_expr
.iter()
.map(|e| {
create_window_expr(
e,
logical_input_schema,
&physical_input_schema,
session_state.execution_props(),
)
})
.collect::<Result<Vec<_>>>()?;
let uses_bounded_memory = window_expr
.iter()
.all(|e| e.uses_bounded_memory());
// If all window expressions can run with bounded memory,
// choose the bounded window variant:
Ok(if uses_bounded_memory {
Arc::new(BoundedWindowAggExec::try_new(
window_expr,
input_exec,
physical_input_schema,
physical_partition_keys,
PartitionSearchMode::Sorted,
)?)
} else {
Arc::new(WindowAggExec::try_new(
window_expr,
input_exec,
physical_input_schema,
physical_partition_keys,
)?)
})
}
LogicalPlan::Aggregate(Aggregate {
input,
group_expr,
aggr_expr,
..
}) => {
// Initially need to perform the aggregate and then merge the partitions
let input_exec = self.create_initial_plan(input, session_state).await?;
let physical_input_schema = input_exec.schema();
let logical_input_schema = input.as_ref().schema();
let groups = self.create_grouping_physical_expr(
group_expr,
logical_input_schema,
&physical_input_schema,
session_state)?;
let agg_filter = aggr_expr
.iter()
.map(|e| {
create_aggregate_expr_and_maybe_filter(
e,
logical_input_schema,
&physical_input_schema,
session_state.execution_props(),
)
})
.collect::<Result<Vec<_>>>()?;
let (aggregates, filters, order_bys) : (Vec<_>, Vec<_>, Vec<_>) = multiunzip(agg_filter.into_iter());
let initial_aggr = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
groups.clone(),
aggregates.clone(),
filters.clone(),
order_bys,
input_exec,
physical_input_schema.clone(),
)?);
// update group column indices based on partial aggregate plan evaluation
let final_group: Vec<Arc<dyn PhysicalExpr>> = initial_aggr.output_group_expr();
let can_repartition = !groups.is_empty()
&& session_state.config().target_partitions() > 1
&& session_state.config().repartition_aggregations();
// Some aggregators may be modified during initialization for
// optimization purposes. For example, a FIRST_VALUE may turn
// into a LAST_VALUE with the reverse ordering requirement.
// To reflect such changes to subsequent stages, use the updated
// `AggregateExpr`/`PhysicalSortExpr` objects.
let updated_aggregates = initial_aggr.aggr_expr.clone();
let updated_order_bys = initial_aggr.order_by_expr.clone();
let (initial_aggr, next_partition_mode): (
Arc<dyn ExecutionPlan>,
AggregateMode,
) = if can_repartition {
// construct a second aggregation with 'AggregateMode::FinalPartitioned'
(initial_aggr, AggregateMode::FinalPartitioned)
} else {
// construct a second aggregation, keeping the final column name equal to the
// first aggregation and the expressions corresponding to the respective aggregate
(initial_aggr, AggregateMode::Final)
};
let final_grouping_set = PhysicalGroupBy::new_single(
final_group
.iter()
.enumerate()
.map(|(i, expr)| (expr.clone(), groups.expr()[i].1.clone()))
.collect()
);
Ok(Arc::new(AggregateExec::try_new(
next_partition_mode,
final_grouping_set,
updated_aggregates,
filters,
updated_order_bys,
initial_aggr,
physical_input_schema.clone(),
)?))
}
LogicalPlan::Projection(Projection { input, expr, .. }) => {
let input_exec = self.create_initial_plan(input, session_state).await?;
let input_schema = input.as_ref().schema();
let physical_exprs = expr
.iter()
.map(|e| {
// For projections, SQL planner and logical plan builder may convert user
// provided expressions into logical Column expressions if their results
// are already provided from the input plans. Because we work with
// qualified columns in logical plane, derived columns involve operators or
// functions will contain qualifiers as well. This will result in logical
// columns with names like `SUM(t1.c1)`, `t1.c1 + t1.c2`, etc.
//
// If we run these logical columns through physical_name function, we will
// get physical names with column qualifiers, which violates DataFusion's
// field name semantics. To account for this, we need to derive the
// physical name from physical input instead.
//
// This depends on the invariant that logical schema field index MUST match
// with physical schema field index.
let physical_name = if let Expr::Column(col) = e {
match input_schema.index_of_column(col) {
Ok(idx) => {
// index physical field using logical field index
Ok(input_exec.schema().field(idx).name().to_string())
}
// logical column is not a derived column, safe to pass along to
// physical_name
Err(_) => physical_name(e),
}
} else {
physical_name(e)
};
tuple_err((
self.create_physical_expr(
e,
input_schema,
&input_exec.schema(),
session_state,
),
physical_name,
))
})
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(ProjectionExec::try_new(
physical_exprs,
input_exec,
)?))
}
LogicalPlan::Filter(filter) => {
let physical_input = self.create_initial_plan(&filter.input, session_state).await?;
let input_schema = physical_input.as_ref().schema();
let input_dfschema = filter.input.schema();
let runtime_expr = self.create_physical_expr(
&filter.predicate,
input_dfschema,
&input_schema,
session_state,
)?;
Ok(Arc::new(FilterExec::try_new(runtime_expr, physical_input)?))
}
LogicalPlan::Union(Union { inputs, schema }) => {
let physical_plans = self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), session_state).await?;
if schema.fields().len() < physical_plans[0].schema().fields().len() {
// `schema` could be a subset of the child schema. For example
// for query "select count(*) from (select a from t union all select a from t)"
// `schema` is empty but child schema contains one field `a`.
Ok(Arc::new(UnionExec::try_new_with_schema(physical_plans, schema.clone())?))
} else {
Ok(Arc::new(UnionExec::new(physical_plans)))
}
}
LogicalPlan::Repartition(Repartition {
input,
partitioning_scheme,
}) => {
let physical_input = self.create_initial_plan(input, session_state).await?;
let input_schema = physical_input.schema();
let input_dfschema = input.as_ref().schema();
let physical_partitioning = match partitioning_scheme {
LogicalPartitioning::RoundRobinBatch(n) => {
Partitioning::RoundRobinBatch(*n)
}
LogicalPartitioning::Hash(expr, n) => {
let runtime_expr = expr
.iter()
.map(|e| {
self.create_physical_expr(
e,
input_dfschema,
&input_schema,
session_state,
)
})
.collect::<Result<Vec<_>>>()?;
Partitioning::Hash(runtime_expr, *n)
}
LogicalPartitioning::DistributeBy(_) => {
return Err(DataFusionError::NotImplemented("Physical plan does not support DistributeBy partitioning".to_string()));
}
};
Ok(Arc::new(RepartitionExec::try_new(
physical_input,
physical_partitioning,
)?))
}
LogicalPlan::Sort(Sort { expr, input, fetch, .. }) => {
let physical_input = self.create_initial_plan(input, session_state).await?;
let input_schema = physical_input.as_ref().schema();
let input_dfschema = input.as_ref().schema();
let sort_expr = expr
.iter()
.map(|e| create_physical_sort_expr(
e,
input_dfschema,
&input_schema,
session_state.execution_props(),
))
.collect::<Result<Vec<_>>>()?;
let new_sort = SortExec::new(sort_expr, physical_input)
.with_fetch(*fetch);
Ok(Arc::new(new_sort))
}
LogicalPlan::Join(Join {
left,
right,
on: keys,
filter,
join_type,
null_equals_null,
schema: join_schema,
..
}) => {
let null_equals_null = *null_equals_null;
// If join has expression equijoin keys, add physical projecton.
let has_expr_join_key = keys.iter().any(|(l, r)| {
!(matches!(l, Expr::Column(_))
&& matches!(r, Expr::Column(_)))
});
if has_expr_join_key {
let left_keys = keys
.iter()
.map(|(l, _r)| l)
.cloned()
.collect::<Vec<_>>();
let right_keys = keys
.iter()
.map(|(_l, r)| r)
.cloned()
.collect::<Vec<_>>();
let (left, right, column_on, added_project) = {
let (left, left_col_keys, left_projected) =
wrap_projection_for_join_if_necessary(
left_keys.as_slice(),
left.as_ref().clone(),
)?;
let (right, right_col_keys, right_projected) =
wrap_projection_for_join_if_necessary(
&right_keys,
right.as_ref().clone(),
)?;
(
left,
right,
(left_col_keys, right_col_keys),
left_projected || right_projected,
)
};
let join_plan =
LogicalPlan::Join(Join::try_new_with_project_input(
logical_plan,
Arc::new(left),
Arc::new(right),
column_on,
)?);
// Remove temporary projected columns
let join_plan = if added_project {
let final_join_result = join_schema
.fields()
.iter()
.map(|field| {
Expr::Column(field.qualified_column())
})
.collect::<Vec<_>>();
let projection =
Projection::try_new(
final_join_result,
Arc::new(join_plan),
)?;
LogicalPlan::Projection(projection)
} else {
join_plan
};
return self
.create_initial_plan(&join_plan, session_state)
.await;
}
// All equi-join keys are columns now, create physical join plan
let left_right = self.create_initial_plan_multi([left.as_ref(), right.as_ref()], session_state).await?;
let [physical_left, physical_right]: [Arc<dyn ExecutionPlan>; 2] = left_right.try_into().map_err(|_| DataFusionError::Internal("`create_initial_plan_multi` is broken".to_string()))?;
let left_df_schema = left.schema();
let right_df_schema = right.schema();
let join_on = keys
.iter()
.map(|(l, r)| {
let l = l.try_into_col()?;
let r = r.try_into_col()?;
Ok((
Column::new(&l.name, left_df_schema.index_of_column(&l)?),
Column::new(&r.name, right_df_schema.index_of_column(&r)?),
))
})
.collect::<Result<join_utils::JoinOn>>()?;
let join_filter = match filter {
Some(expr) => {
// Extract columns from filter expression and saved in a HashSet
let cols = expr.to_columns()?;
// Collect left & right field indices, the field indices are sorted in ascending order
let left_field_indices = cols.iter()
.filter_map(|c| match left_df_schema.index_of_column(c) {
Ok(idx) => Some(idx),
_ => None,
}).sorted()
.collect::<Vec<_>>();
let right_field_indices = cols.iter()
.filter_map(|c| match right_df_schema.index_of_column(c) {
Ok(idx) => Some(idx),
_ => None,
}).sorted()
.collect::<Vec<_>>();
// Collect DFFields and Fields required for intermediate schemas
let (filter_df_fields, filter_fields): (Vec<_>, Vec<_>) = left_field_indices.clone()
.into_iter()
.map(|i| (
left_df_schema.field(i).clone(),
physical_left.schema().field(i).clone(),
))
.chain(
right_field_indices.clone()
.into_iter()
.map(|i| (
right_df_schema.field(i).clone(),
physical_right.schema().field(i).clone(),
))
)
.unzip();
// Construct intermediate schemas used for filtering data and
// convert logical expression to physical according to filter schema
let filter_df_schema = DFSchema::new_with_metadata(filter_df_fields, HashMap::new())?;
let filter_schema = Schema::new_with_metadata(filter_fields, HashMap::new());
let filter_expr = create_physical_expr(
expr,
&filter_df_schema,
&filter_schema,
session_state.execution_props(),
)?;
let column_indices = join_utils::JoinFilter::build_column_indices(left_field_indices, right_field_indices);
Some(join_utils::JoinFilter::new(
filter_expr,
column_indices,
filter_schema,
))
}
_ => None
};
let prefer_hash_join = session_state.config_options().optimizer.prefer_hash_join;
if join_on.is_empty() {
// there is no equal join condition, use the nested loop join
// TODO optimize the plan, and use the config of `target_partitions` and `repartition_joins`
Ok(Arc::new(NestedLoopJoinExec::try_new(
physical_left,
physical_right,
join_filter,
join_type,
)?))
} else if session_state.config().target_partitions() > 1
&& session_state.config().repartition_joins()
&& !prefer_hash_join
{
// Use SortMergeJoin if hash join is not preferred
// Sort-Merge join support currently is experimental
if join_filter.is_some() {
// TODO SortMergeJoinExec need to support join filter
Err(DataFusionError::NotImplemented("SortMergeJoinExec does not support join_filter now.".to_string()))
} else {
let join_on_len = join_on.len();
Ok(Arc::new(SortMergeJoinExec::try_new(
physical_left,
physical_right,
join_on,
*join_type,
vec![SortOptions::default(); join_on_len],
null_equals_null,
)?))
}
} else if session_state.config().target_partitions() > 1
&& session_state.config().repartition_joins()
&& prefer_hash_join {
let partition_mode = {
if session_state.config().collect_statistics() {
PartitionMode::Auto
} else {
PartitionMode::Partitioned
}
};
Ok(Arc::new(HashJoinExec::try_new(
physical_left,
physical_right,
join_on,
join_filter,
join_type,
partition_mode,
null_equals_null,
)?))
} else {
Ok(Arc::new(HashJoinExec::try_new(
physical_left,
physical_right,
join_on,
join_filter,
join_type,
PartitionMode::CollectLeft,
null_equals_null,
)?))
}
}
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
let left_right = self.create_initial_plan_multi([left.as_ref(), right.as_ref()], session_state).await?;
let [left, right]: [Arc<dyn ExecutionPlan>; 2] = left_right.try_into().map_err(|_| DataFusionError::Internal("`create_initial_plan_multi` is broken".to_string()))?;
Ok(Arc::new(CrossJoinExec::new(left, right)))
}
LogicalPlan::Subquery(_) => todo!(),
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row,
schema,
}) => Ok(Arc::new(EmptyExec::new(
*produce_one_row,
SchemaRef::new(schema.as_ref().to_owned().into()),
))),
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
self.create_initial_plan(input, session_state).await
}
LogicalPlan::Limit(Limit { input, skip, fetch, .. }) => {
let input = self.create_initial_plan(input, session_state).await?;
// GlobalLimitExec requires a single partition for input
let input = if input.output_partitioning().partition_count() == 1 {
input
} else {
// Apply a LocalLimitExec to each partition. The optimizer will also insert
// a CoalescePartitionsExec between the GlobalLimitExec and LocalLimitExec
if let Some(fetch) = fetch {
Arc::new(LocalLimitExec::new(input, *fetch + skip))
} else {
input
}
};
Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch)))
}
LogicalPlan::Unnest(Unnest { input, column, schema }) => {
let input = self.create_initial_plan(input, session_state).await?;
let column_exec = schema.index_of_column(column)
.map(|idx| Column::new(&column.name, idx))?;
let schema = SchemaRef::new(schema.as_ref().to_owned().into());
Ok(Arc::new(UnnestExec::new(input, column_exec, schema)))
}
LogicalPlan::Ddl(ddl) => {
// There is no default plan for DDl statements --
// it must be handled at a higher level (so that
// the appropriate table can be registered with
// the context)
let name = ddl.name();
Err(DataFusionError::NotImplemented(
format!("Unsupported logical plan: {name}")
))
}
LogicalPlan::Prepare(_) => {
// There is no default plan for "PREPARE" -- it must be
// handled at a higher level (so that the appropriate
// statement can be prepared)
Err(DataFusionError::NotImplemented(
"Unsupported logical plan: Prepare".to_string(),
))
}
LogicalPlan::Dml(_) => {
// DataFusion is a read-only query engine, but also a library, so consumers may implement this
Err(DataFusionError::NotImplemented(
"Unsupported logical plan: Dml".to_string(),
))
}
LogicalPlan::Statement(statement) => {
// DataFusion is a read-only query engine, but also a library, so consumers may implement this
let name = statement.name();
Err(DataFusionError::NotImplemented(
format!("Unsupported logical plan: Statement({name})")
))
}
LogicalPlan::DescribeTable(_) => {
Err(DataFusionError::Internal(
"Unsupported logical plan: DescribeTable must be root of the plan".to_string(),
))
}
LogicalPlan::Explain(_) => Err(DataFusionError::Internal(
"Unsupported logical plan: Explain must be root of the plan".to_string(),
)),
LogicalPlan::Distinct(_) => {
Err(DataFusionError::Internal(
"Unsupported logical plan: Distinct should be replaced to Aggregate".to_string(),
))
}
LogicalPlan::Analyze(_) => Err(DataFusionError::Internal(
"Unsupported logical plan: Analyze must be root of the plan".to_string(),
)),
LogicalPlan::Extension(e) => {
let physical_inputs = self.create_initial_plan_multi(e.node.inputs(), session_state).await?;
let mut maybe_plan = None;
for planner in &self.extension_planners {
if maybe_plan.is_some() {
break;
}
let logical_input = e.node.inputs();
maybe_plan = planner.plan_extension(
self,
e.node.as_ref(),
&logical_input,
&physical_inputs,
session_state,
).await?;
}
let plan = match maybe_plan {
Some(v) => Ok(v),
_ => plan_err!("No installed planner was able to convert the custom node to an execution plan: {:?}", e.node)
}?;
// Ensure the ExecutionPlan's schema matches the
// declared logical schema to catch and warn about
// logic errors when creating user defined plans.
if !e.node.schema().matches_arrow_schema(&plan.schema()) {
plan_err!(
"Extension planner for {:?} created an ExecutionPlan with mismatched schema. \
LogicalPlan schema: {:?}, ExecutionPlan schema: {:?}",
e.node, e.node.schema(), plan.schema()
)
} else {
Ok(plan)
}
}
};
exec_plan
}.boxed()
}