fn execute()

in src/pre_fetch.rs [75:103]


    fn execute(
        &self,
        partition: usize,
        context: std::sync::Arc<datafusion::execution::TaskContext>,
    ) -> Result<SendableRecordBatchStream> {
        let (tx, mut rx) = channel(self.buf_size);

        let mut input_stream = self.input.execute(partition, context)?;

        let consume_fut = async move {
            while let Some(batch) = input_stream.next().await {
                // TODO: how to neatly errors within this macro?
                tx.send(batch).await.unwrap();
            }
        };

        tokio::spawn(consume_fut);

        let out_stream = async_stream::stream! {
            while let Some(batch) = rx.recv().await {
                yield batch;
            }
        };

        Ok(Box::pin(RecordBatchStreamAdapter::new(
            self.schema().clone(),
            out_stream,
        )))
    }