datafusion/core/src/physical_plan/union.rs (497 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Some of these functions reference the Postgres documentation // or implementation to ensure compatibility and are subject to // the Postgres license. //! The Union operator combines multiple inputs with the same schema use std::pin::Pin; use std::task::{Context, Poll}; use std::{any::Any, sync::Arc}; use arrow::{ datatypes::{Field, Schema, SchemaRef}, record_batch::RecordBatch, }; use datafusion_common::{DFSchemaRef, DataFusionError}; use futures::Stream; use itertools::Itertools; use log::{debug, trace, warn}; use super::DisplayAs; use super::{ expressions::PhysicalSortExpr, metrics::{ExecutionPlanMetricsSet, MetricsSet}, ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, }; use crate::physical_plan::common::get_meet_of_orderings; use crate::physical_plan::stream::ObservedStream; use crate::physical_plan::{expressions, metrics::BaselineMetrics}; use datafusion_common::Result; use datafusion_execution::TaskContext; use tokio::macros::support::thread_rng_n; /// `UnionExec`: `UNION ALL` execution plan. /// /// `UnionExec` combines multiple inputs with the same schema by /// concatenating the partitions. It does not mix or copy data within /// or across partitions. Thus if the input partitions are sorted, the /// output partitions of the union are also sorted. /// /// For example, given a `UnionExec` of two inputs, with `N` /// partitions, and `M` partitions, there will be `N+M` output /// partitions. The first `N` output partitions are from Input 1 /// partitions, and then next `M` output partitions are from Input 2. /// /// ```text /// ▲ ▲ ▲ ▲ /// │ │ │ │ /// Output │ ... │ │ │ /// Partitions │0 │N-1 │ N │N+M-1 ///(passes through ┌────┴───────┴───────────┴─────────┴───┐ /// the N+M input │ UnionExec │ /// partitions) │ │ /// └──────────────────────────────────────┘ /// ▲ /// │ /// │ /// Input ┌────────┬─────┴────┬──────────┐ /// Partitions │ ... │ │ ... │ /// 0 │ │ N-1 │ 0 │ M-1 /// ┌────┴────────┴───┐ ┌───┴──────────┴───┐ /// │ │ │ │ /// │ │ │ │ /// │ │ │ │ /// │ │ │ │ /// │ │ │ │ /// │ │ │ │ /// │Input 1 │ │Input 2 │ /// └─────────────────┘ └──────────────────┘ /// ``` #[derive(Debug)] pub struct UnionExec { /// Input execution plan inputs: Vec<Arc<dyn ExecutionPlan>>, /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Schema of Union schema: SchemaRef, } impl UnionExec { /// Create a new UnionExec with specified schema. /// The `schema` should always be a subset of the schema of `inputs`, /// otherwise, an error will be returned. pub fn try_new_with_schema( inputs: Vec<Arc<dyn ExecutionPlan>>, schema: DFSchemaRef, ) -> Result<Self> { let mut exec = Self::new(inputs); let exec_schema = exec.schema(); let fields = schema .fields() .iter() .map(|dff| { exec_schema .field_with_name(dff.name()) .cloned() .map_err(|_| { DataFusionError::Internal(format!( "Cannot find the field {:?} in child schema", dff.name() )) }) }) .collect::<Result<Vec<Field>>>()?; let schema = Arc::new(Schema::new_with_metadata( fields, exec.schema().metadata().clone(), )); exec.schema = schema; Ok(exec) } /// Create a new UnionExec pub fn new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Self { let schema = union_schema(&inputs); UnionExec { inputs, metrics: ExecutionPlanMetricsSet::new(), schema, } } /// Get inputs of the execution plan pub fn inputs(&self) -> &Vec<Arc<dyn ExecutionPlan>> { &self.inputs } } impl DisplayAs for UnionExec { fn fmt_as( &self, t: DisplayFormatType, f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "UnionExec") } } } } impl ExecutionPlan for UnionExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { self } fn schema(&self) -> SchemaRef { self.schema.clone() } /// Specifies whether this plan generates an infinite stream of records. /// If the plan does not support pipelining, but its input(s) are /// infinite, returns an error to indicate this. fn unbounded_output(&self, children: &[bool]) -> Result<bool> { Ok(children.iter().any(|x| *x)) } fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> { self.inputs.clone() } /// Output of the union is the combination of all output partitions of the inputs fn output_partitioning(&self) -> Partitioning { // Output the combination of all output partitions of the inputs if the Union is not partition aware let num_partitions = self .inputs .iter() .map(|plan| plan.output_partitioning().partition_count()) .sum(); Partitioning::UnknownPartitioning(num_partitions) } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { // The output ordering is the "meet" of its input orderings. // The meet is the finest ordering that satisfied by all the input // orderings, see https://en.wikipedia.org/wiki/Join_and_meet. get_meet_of_orderings(&self.inputs) } fn maintains_input_order(&self) -> Vec<bool> { // If the Union has an output ordering, it maintains at least one // child's ordering (i.e. the meet). // For instance, assume that the first child is SortExpr('a','b','c'), // the second child is SortExpr('a','b') and the third child is // SortExpr('a','b'). The output ordering would be SortExpr('a','b'), // which is the "meet" of all input orderings. In this example, this // function will return vec![false, true, true], indicating that we // preserve the orderings for the 2nd and the 3rd children. if let Some(output_ordering) = self.output_ordering() { self.inputs() .iter() .map(|child| { if let Some(child_ordering) = child.output_ordering() { output_ordering.len() == child_ordering.len() } else { false } }) .collect() } else { vec![false; self.inputs().len()] } } fn with_new_children( self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>> { Ok(Arc::new(UnionExec::new(children))) } fn execute( &self, mut partition: usize, context: Arc<TaskContext>, ) -> Result<SendableRecordBatchStream> { trace!("Start UnionExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); // record the tiny amount of work done in this function so // elapsed_compute is reported as non zero let elapsed_compute = baseline_metrics.elapsed_compute().clone(); let _timer = elapsed_compute.timer(); // record on drop // find partition to execute for input in self.inputs.iter() { // Calculate whether partition belongs to the current partition if partition < input.output_partitioning().partition_count() { let stream = input.execute(partition, context)?; debug!("Found a Union partition to execute"); return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics))); } else { partition -= input.output_partitioning().partition_count(); } } warn!("Error in Union: Partition {} not found", partition); Err(DataFusionError::Execution(format!( "Partition {partition} not found in Union" ))) } fn metrics(&self) -> Option<MetricsSet> { Some(self.metrics.clone_inner()) } fn statistics(&self) -> Statistics { self.inputs .iter() .map(|ep| ep.statistics()) .reduce(stats_union) .unwrap_or_default() } fn benefits_from_input_partitioning(&self) -> bool { false } } /// Combines multiple input streams by interleaving them. /// /// This only works if all inputs have the same hash-partitioning. /// /// # Data Flow /// ```text /// +---------+ /// | |---+ /// | Input 1 | | /// | |-------------+ /// +---------+ | | /// | | +---------+ /// +------------------>| | /// +---------------->| Combine |--> /// | +-------------->| | /// | | | +---------+ /// +---------+ | | | /// | |-----+ | | /// | Input 2 | | | /// | |---------------+ /// +---------+ | | | /// | | | +---------+ /// | +-------->| | /// | +------>| Combine |--> /// | +---->| | /// | | +---------+ /// +---------+ | | /// | |-------+ | /// | Input 3 | | /// | |-----------------+ /// +---------+ /// ``` #[derive(Debug)] pub struct InterleaveExec { /// Input execution plan inputs: Vec<Arc<dyn ExecutionPlan>>, /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Schema of Interleave schema: SchemaRef, } impl InterleaveExec { /// Create a new InterleaveExec pub fn try_new(inputs: Vec<Arc<dyn ExecutionPlan>>) -> Result<Self> { let schema = union_schema(&inputs); if !can_interleave(&inputs) { return Err(DataFusionError::Internal(String::from( "Not all InterleaveExec children have a consistent hash partitioning", ))); } Ok(InterleaveExec { inputs, metrics: ExecutionPlanMetricsSet::new(), schema, }) } /// Get inputs of the execution plan pub fn inputs(&self) -> &Vec<Arc<dyn ExecutionPlan>> { &self.inputs } } impl DisplayAs for InterleaveExec { fn fmt_as( &self, t: DisplayFormatType, f: &mut std::fmt::Formatter, ) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { write!(f, "InterleaveExec") } } } } impl ExecutionPlan for InterleaveExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { self } fn schema(&self) -> SchemaRef { self.schema.clone() } /// Specifies whether this plan generates an infinite stream of records. /// If the plan does not support pipelining, but its input(s) are /// infinite, returns an error to indicate this. fn unbounded_output(&self, children: &[bool]) -> Result<bool> { Ok(children.iter().any(|x| *x)) } fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> { self.inputs.clone() } /// All inputs must have the same partitioning. The output partioning of InterleaveExec is the same as the inputs /// (NOT combined). E.g. if there are 10 inputs where each is `Hash(3)`-partitioned, InterleaveExec is also /// `Hash(3)`-partitioned. fn output_partitioning(&self) -> Partitioning { self.inputs[0].output_partitioning() } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { None } fn maintains_input_order(&self) -> Vec<bool> { vec![false; self.inputs().len()] } fn with_new_children( self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>> { Ok(Arc::new(InterleaveExec::try_new(children)?)) } fn execute( &self, partition: usize, context: Arc<TaskContext>, ) -> Result<SendableRecordBatchStream> { trace!("Start InterleaveExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); // record the tiny amount of work done in this function so // elapsed_compute is reported as non zero let elapsed_compute = baseline_metrics.elapsed_compute().clone(); let _timer = elapsed_compute.timer(); // record on drop let mut input_stream_vec = vec![]; for input in self.inputs.iter() { if partition < input.output_partitioning().partition_count() { input_stream_vec.push(input.execute(partition, context.clone())?); } else { // Do not find a partition to execute break; } } if input_stream_vec.len() == self.inputs.len() { let stream = Box::pin(CombinedRecordBatchStream::new( self.schema(), input_stream_vec, )); return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics))); } warn!("Error in InterleaveExec: Partition {} not found", partition); Err(DataFusionError::Execution(format!( "Partition {partition} not found in InterleaveExec" ))) } fn metrics(&self) -> Option<MetricsSet> { Some(self.metrics.clone_inner()) } fn statistics(&self) -> Statistics { self.inputs .iter() .map(|ep| ep.statistics()) .reduce(stats_union) .unwrap_or_default() } fn benefits_from_input_partitioning(&self) -> bool { false } } /// If all the input partitions have the same Hash partition spec with the first_input_partition /// The InterleaveExec is partition aware. /// /// It might be too strict here in the case that the input partition specs are compatible but not exactly the same. /// For example one input partition has the partition spec Hash('a','b','c') and /// other has the partition spec Hash('a'), It is safe to derive the out partition with the spec Hash('a','b','c'). pub fn can_interleave(inputs: &[Arc<dyn ExecutionPlan>]) -> bool { if inputs.is_empty() { return false; } let first_input_partition = inputs[0].output_partitioning(); matches!(first_input_partition, Partitioning::Hash(_, _)) && inputs .iter() .map(|plan| plan.output_partitioning()) .all(|partition| partition == first_input_partition) } fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> SchemaRef { let fields: Vec<Field> = (0..inputs[0].schema().fields().len()) .map(|i| { inputs .iter() .filter_map(|input| { if input.schema().fields().len() > i { Some(input.schema().field(i).clone()) } else { None } }) .find_or_first(|f| f.is_nullable()) .unwrap() }) .collect(); Arc::new(Schema::new_with_metadata( fields, inputs[0].schema().metadata().clone(), )) } /// CombinedRecordBatchStream can be used to combine a Vec of SendableRecordBatchStreams into one struct CombinedRecordBatchStream { /// Schema wrapped by Arc schema: SchemaRef, /// Stream entries entries: Vec<SendableRecordBatchStream>, } impl CombinedRecordBatchStream { /// Create an CombinedRecordBatchStream pub fn new(schema: SchemaRef, entries: Vec<SendableRecordBatchStream>) -> Self { Self { schema, entries } } } impl RecordBatchStream for CombinedRecordBatchStream { fn schema(&self) -> SchemaRef { self.schema.clone() } } impl Stream for CombinedRecordBatchStream { type Item = Result<RecordBatch>; fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>> { use Poll::*; let start = thread_rng_n(self.entries.len() as u32) as usize; let mut idx = start; for _ in 0..self.entries.len() { let stream = self.entries.get_mut(idx).unwrap(); match Pin::new(stream).poll_next(cx) { Ready(Some(val)) => return Ready(Some(val)), Ready(None) => { // Remove the entry self.entries.swap_remove(idx); // Check if this was the last entry, if so the cursor needs // to wrap if idx == self.entries.len() { idx = 0; } else if idx < start && start <= self.entries.len() { // The stream being swapped into the current index has // already been polled, so skip it. idx = idx.wrapping_add(1) % self.entries.len(); } } Pending => { idx = idx.wrapping_add(1) % self.entries.len(); } } } // If the map is empty, then the stream is complete. if self.entries.is_empty() { Ready(None) } else { Pending } } } fn col_stats_union( mut left: ColumnStatistics, right: ColumnStatistics, ) -> ColumnStatistics { left.distinct_count = None; left.min_value = left .min_value .zip(right.min_value) .map(|(a, b)| expressions::helpers::min(&a, &b)) .and_then(Result::ok); left.max_value = left .max_value .zip(right.max_value) .map(|(a, b)| expressions::helpers::max(&a, &b)) .and_then(Result::ok); left.null_count = left.null_count.zip(right.null_count).map(|(a, b)| a + b); left } fn stats_union(mut left: Statistics, right: Statistics) -> Statistics { left.is_exact = left.is_exact && right.is_exact; left.num_rows = left.num_rows.zip(right.num_rows).map(|(a, b)| a + b); left.total_byte_size = left .total_byte_size .zip(right.total_byte_size) .map(|(a, b)| a + b); left.column_statistics = left.column_statistics .zip(right.column_statistics) .map(|(a, b)| { a.into_iter() .zip(b) .map(|(ca, cb)| col_stats_union(ca, cb)) .collect() }); left } #[cfg(test)] mod tests { use super::*; use crate::test; use crate::prelude::SessionContext; use crate::{physical_plan::collect, scalar::ScalarValue}; use arrow::record_batch::RecordBatch; #[tokio::test] async fn test_union_partitions() -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); // Create csv's with different partitioning let csv = test::scan_partitioned_csv(4)?; let csv2 = test::scan_partitioned_csv(5)?; let union_exec = Arc::new(UnionExec::new(vec![csv, csv2])); // Should have 9 partitions and 9 output batches assert_eq!(union_exec.output_partitioning().partition_count(), 9); let result: Vec<RecordBatch> = collect(union_exec, task_ctx).await?; assert_eq!(result.len(), 9); Ok(()) } #[tokio::test] async fn test_stats_union() { let left = Statistics { is_exact: true, num_rows: Some(5), total_byte_size: Some(23), column_statistics: Some(vec![ ColumnStatistics { distinct_count: Some(5), max_value: Some(ScalarValue::Int64(Some(21))), min_value: Some(ScalarValue::Int64(Some(-4))), null_count: Some(0), }, ColumnStatistics { distinct_count: Some(1), max_value: Some(ScalarValue::Utf8(Some(String::from("x")))), min_value: Some(ScalarValue::Utf8(Some(String::from("a")))), null_count: Some(3), }, ColumnStatistics { distinct_count: None, max_value: Some(ScalarValue::Float32(Some(1.1))), min_value: Some(ScalarValue::Float32(Some(0.1))), null_count: None, }, ]), }; let right = Statistics { is_exact: true, num_rows: Some(7), total_byte_size: Some(29), column_statistics: Some(vec![ ColumnStatistics { distinct_count: Some(3), max_value: Some(ScalarValue::Int64(Some(34))), min_value: Some(ScalarValue::Int64(Some(1))), null_count: Some(1), }, ColumnStatistics { distinct_count: None, max_value: Some(ScalarValue::Utf8(Some(String::from("c")))), min_value: Some(ScalarValue::Utf8(Some(String::from("b")))), null_count: None, }, ColumnStatistics { distinct_count: None, max_value: None, min_value: None, null_count: None, }, ]), }; let result = stats_union(left, right); let expected = Statistics { is_exact: true, num_rows: Some(12), total_byte_size: Some(52), column_statistics: Some(vec![ ColumnStatistics { distinct_count: None, max_value: Some(ScalarValue::Int64(Some(34))), min_value: Some(ScalarValue::Int64(Some(-4))), null_count: Some(1), }, ColumnStatistics { distinct_count: None, max_value: Some(ScalarValue::Utf8(Some(String::from("x")))), min_value: Some(ScalarValue::Utf8(Some(String::from("a")))), null_count: None, }, ColumnStatistics { distinct_count: None, max_value: None, min_value: None, null_count: None, }, ]), }; assert_eq!(result, expected); } }