in src/columnar_storage/src/read.rs [349:384]
fn poll_next(
mut self: Pin<&mut Self>,
ctx: &mut std::task::Context,
) -> Poll<Option<Self::Item>> {
loop {
match self.stream.poll_next_unpin(ctx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
let value = if let Some(mut pending) = self.pending_batch.take() {
self.maybe_remove_builtin_columns(&mut pending);
let res = self
.value_operator
.merge(pending)
.map_err(|e| DataFusionError::External(Box::new(e)));
Some(res)
} else {
None
};
return Poll::Ready(value);
}
Poll::Ready(Some(v)) => match v {
Ok(v) => match self.merge_batch(v) {
Ok(v) => {
if let Some(v) = v {
return Poll::Ready(Some(Ok(v)));
}
}
Err(e) => {
return Poll::Ready(Some(Err(DataFusionError::External(Box::new(e)))))
}
},
Err(e) => return Poll::Ready(Some(Err(e))),
},
}
}
}