fn make_stream()

in src/processor_service.rs [174:193]


fn make_stream(
    inner: &DFRayProcessorHandlerInner,
    partition: usize,
) -> Result<impl Stream<Item = Result<RecordBatch, FlightError>> + Send + 'static, Status> {
    let task_ctx = inner.ctx.task_ctx();

    let stream = inner
        .plan
        .execute(partition, task_ctx)
        .inspect_err(|e| {
            error!(
                "{}",
                format!("Could not get partition stream from plan {e}")
            )
        })
        .map_err(|e| Status::internal(format!("Could not get partition stream from plan {e}")))?
        .map_err(|e| FlightError::from_external_error(Box::new(e)));

    Ok(stream)
}