in src/processor_service.rs [174:193]
fn make_stream(
inner: &DFRayProcessorHandlerInner,
partition: usize,
) -> Result<impl Stream<Item = Result<RecordBatch, FlightError>> + Send + 'static, Status> {
let task_ctx = inner.ctx.task_ctx();
let stream = inner
.plan
.execute(partition, task_ctx)
.inspect_err(|e| {
error!(
"{}",
format!("Could not get partition stream from plan {e}")
)
})
.map_err(|e| Status::internal(format!("Could not get partition stream from plan {e}")))?
.map_err(|e| FlightError::from_external_error(Box::new(e)));
Ok(stream)
}