fn poll_next_inner()

in datafusion/physical-plan/src/sorts/merge.rs [209:284]


    fn poll_next_inner(
        &mut self,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<RecordBatch>>> {
        if self.aborted {
            return Poll::Ready(None);
        }
        // Once all partitions have set their corresponding cursors for the loser tree,
        // we skip the following block. Until then, this function may be called multiple
        // times and can return Poll::Pending if any partition returns Poll::Pending.
        if self.loser_tree.is_empty() {
            while let Some(&partition_idx) = self.uninitiated_partitions.front() {
                match self.maybe_poll_stream(cx, partition_idx) {
                    Poll::Ready(Err(e)) => {
                        self.aborted = true;
                        return Poll::Ready(Some(Err(e)));
                    }
                    Poll::Pending => {
                        // If a partition returns Poll::Pending, to avoid continuously polling it
                        // and potentially increasing upstream buffer sizes, we move it to the
                        // back of the polling queue.
                        self.uninitiated_partitions.rotate_left(1);

                        // This function could remain in a pending state, so we manually wake it here.
                        // However, this approach can be investigated further to find a more natural way
                        // to avoid disrupting the runtime scheduler.
                        cx.waker().wake_by_ref();
                        return Poll::Pending;
                    }
                    _ => {
                        // If the polling result is Poll::Ready(Some(batch)) or Poll::Ready(None),
                        // we remove this partition from the queue so it is not polled again.
                        self.uninitiated_partitions.pop_front();
                    }
                }
            }

            // Claim the memory for the uninitiated partitions
            self.uninitiated_partitions.shrink_to_fit();
            self.init_loser_tree();
        }

        // NB timer records time taken on drop, so there are no
        // calls to `timer.done()` below.
        let elapsed_compute = self.metrics.elapsed_compute().clone();
        let _timer = elapsed_compute.timer();

        loop {
            // Adjust the loser tree if necessary, returning control if needed
            if !self.loser_tree_adjusted {
                let winner = self.loser_tree[0];
                if let Err(e) = ready!(self.maybe_poll_stream(cx, winner)) {
                    self.aborted = true;
                    return Poll::Ready(Some(Err(e)));
                }
                self.update_loser_tree();
            }

            let stream_idx = self.loser_tree[0];
            if self.advance_cursors(stream_idx) {
                self.loser_tree_adjusted = false;
                self.in_progress.push_row(stream_idx);

                // stop sorting if fetch has been reached
                if self.fetch_reached() {
                    self.aborted = true;
                } else if self.in_progress.len() < self.batch_size {
                    continue;
                }
            }

            self.produced += self.in_progress.len();

            return Poll::Ready(self.in_progress.build_record_batch().transpose());
        }
    }