in src/isolator.rs [85:115]
fn execute(
&self,
partition: usize,
context: std::sync::Arc<datafusion::execution::TaskContext>,
) -> Result<SendableRecordBatchStream> {
let config = context.session_config();
let partition_group = &config
.get_extension::<PartitionGroup>()
.ok_or(internal_datafusion_err!(
"PartitionGroup not set in session config"
))?
.0;
if partition > self.partition_count {
error!(
"PartitionIsolatorExec asked to execute partition {} but only has {} partitions",
partition, self.partition_count
);
return Err(internal_datafusion_err!(
"Invalid partition {} for PartitionIsolatorExec",
partition
));
}
let output_stream = match partition_group.get(partition) {
Some(actual_partition_number) => self.input.execute(*actual_partition_number, context),
None => Ok(Box::pin(EmptyRecordBatchStream::new(self.input.schema()))
as SendableRecordBatchStream),
};
output_stream
}