src/max_rows.rs (52 lines of code) (raw):
use std::{fmt::Formatter, sync::Arc};
use datafusion::{
error::Result,
execution::SendableRecordBatchStream,
physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties},
};
use crate::util::max_rows_stream;
/// An Execution plan that will not yield batches with greater than max_rows.
///
/// If its input produces a batch with greater than max_rows it will zero-copy
/// split the batch and continue to do this until the remaining batch has
/// <= max_rows rows. It will yield each of these batches as separate Items
#[derive(Debug)]
pub struct MaxRowsExec {
pub input: Arc<dyn ExecutionPlan>,
pub max_rows: usize,
}
impl MaxRowsExec {
pub fn new(input: Arc<dyn ExecutionPlan>, max_rows: usize) -> Self {
Self { input, max_rows }
}
}
impl DisplayAs for MaxRowsExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(f, "MaxRowsExec[max_rows={}]", self.max_rows)
}
}
impl ExecutionPlan for MaxRowsExec {
fn name(&self) -> &str {
"MaxRowsExec"
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn properties(&self) -> &PlanProperties {
self.input.properties()
}
fn children(&self) -> Vec<&std::sync::Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn with_new_children(
self: std::sync::Arc<Self>,
children: Vec<std::sync::Arc<dyn ExecutionPlan>>,
) -> Result<std::sync::Arc<dyn ExecutionPlan>> {
// TODO: generalize this
assert_eq!(children.len(), 1);
Ok(Arc::new(Self::new(children[0].clone(), self.max_rows)))
}
fn execute(
&self,
partition: usize,
context: std::sync::Arc<datafusion::execution::TaskContext>,
) -> Result<SendableRecordBatchStream> {
self.input
.execute(partition, context)
.map(|stream| max_rows_stream(stream, self.max_rows))
}
}