in src/pre_fetch.rs [75:103]
fn execute(
&self,
partition: usize,
context: std::sync::Arc<datafusion::execution::TaskContext>,
) -> Result<SendableRecordBatchStream> {
let (tx, mut rx) = channel(self.buf_size);
let mut input_stream = self.input.execute(partition, context)?;
let consume_fut = async move {
while let Some(batch) = input_stream.next().await {
// TODO: how to neatly errors within this macro?
tx.send(batch).await.unwrap();
}
};
tokio::spawn(consume_fut);
let out_stream = async_stream::stream! {
while let Some(batch) = rx.recv().await {
yield batch;
}
};
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema().clone(),
out_stream,
)))
}