in datafusion/physical-plan/src/sorts/merge.rs [209:284]
fn poll_next_inner(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
if self.aborted {
return Poll::Ready(None);
}
// Once all partitions have set their corresponding cursors for the loser tree,
// we skip the following block. Until then, this function may be called multiple
// times and can return Poll::Pending if any partition returns Poll::Pending.
if self.loser_tree.is_empty() {
while let Some(&partition_idx) = self.uninitiated_partitions.front() {
match self.maybe_poll_stream(cx, partition_idx) {
Poll::Ready(Err(e)) => {
self.aborted = true;
return Poll::Ready(Some(Err(e)));
}
Poll::Pending => {
// If a partition returns Poll::Pending, to avoid continuously polling it
// and potentially increasing upstream buffer sizes, we move it to the
// back of the polling queue.
self.uninitiated_partitions.rotate_left(1);
// This function could remain in a pending state, so we manually wake it here.
// However, this approach can be investigated further to find a more natural way
// to avoid disrupting the runtime scheduler.
cx.waker().wake_by_ref();
return Poll::Pending;
}
_ => {
// If the polling result is Poll::Ready(Some(batch)) or Poll::Ready(None),
// we remove this partition from the queue so it is not polled again.
self.uninitiated_partitions.pop_front();
}
}
}
// Claim the memory for the uninitiated partitions
self.uninitiated_partitions.shrink_to_fit();
self.init_loser_tree();
}
// NB timer records time taken on drop, so there are no
// calls to `timer.done()` below.
let elapsed_compute = self.metrics.elapsed_compute().clone();
let _timer = elapsed_compute.timer();
loop {
// Adjust the loser tree if necessary, returning control if needed
if !self.loser_tree_adjusted {
let winner = self.loser_tree[0];
if let Err(e) = ready!(self.maybe_poll_stream(cx, winner)) {
self.aborted = true;
return Poll::Ready(Some(Err(e)));
}
self.update_loser_tree();
}
let stream_idx = self.loser_tree[0];
if self.advance_cursors(stream_idx) {
self.loser_tree_adjusted = false;
self.in_progress.push_row(stream_idx);
// stop sorting if fetch has been reached
if self.fetch_reached() {
self.aborted = true;
} else if self.in_progress.len() < self.batch_size {
continue;
}
}
self.produced += self.in_progress.len();
return Poll::Ready(self.in_progress.build_record_batch().transpose());
}
}