in datafusion/core/src/physical_plan/sorts/merge.rs [206:259]
fn poll_next_inner(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
if self.aborted {
return Poll::Ready(None);
}
// try to initialize the loser tree
if self.loser_tree.is_empty() {
// Ensure all non-exhausted streams have a cursor from which
// rows can be pulled
for i in 0..self.streams.partitions() {
if let Err(e) = ready!(self.maybe_poll_stream(cx, i)) {
self.aborted = true;
return Poll::Ready(Some(Err(e)));
}
}
self.init_loser_tree();
}
// NB timer records time taken on drop, so there are no
// calls to `timer.done()` below.
let elapsed_compute = self.metrics.elapsed_compute().clone();
let _timer = elapsed_compute.timer();
loop {
// Adjust the loser tree if necessary, returning control if needed
if !self.loser_tree_adjusted {
let winner = self.loser_tree[0];
if let Err(e) = ready!(self.maybe_poll_stream(cx, winner)) {
self.aborted = true;
return Poll::Ready(Some(Err(e)));
}
self.update_loser_tree();
}
let stream_idx = self.loser_tree[0];
if self.advance(stream_idx) {
self.loser_tree_adjusted = false;
self.in_progress.push_row(stream_idx);
// stop sorting if fetch has been reached
if self.fetch_reached() {
self.aborted = true;
} else if self.in_progress.len() < self.batch_size {
continue;
}
}
self.produced += self.in_progress.len();
return Poll::Ready(self.in_progress.build_record_batch().transpose());
}
}