fn poll_next()

in datafusion/physical-plan/src/joins/sort_merge_join.rs [1129:1311]


    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        let join_time = self.join_metrics.join_time.clone();
        let _timer = join_time.timer();
        loop {
            match &self.state {
                SortMergeJoinState::Init => {
                    let streamed_exhausted =
                        self.streamed_state == StreamedState::Exhausted;
                    let buffered_exhausted =
                        self.buffered_state == BufferedState::Exhausted;
                    self.state = if streamed_exhausted && buffered_exhausted {
                        SortMergeJoinState::Exhausted
                    } else {
                        match self.current_ordering {
                            Ordering::Less | Ordering::Equal => {
                                if !streamed_exhausted {
                                    if self.filter.is_some()
                                        && matches!(
                                            self.join_type,
                                            JoinType::Left
                                                | JoinType::LeftSemi
                                                | JoinType::LeftMark
                                                | JoinType::Right
                                                | JoinType::LeftAnti
                                                | JoinType::RightAnti
                                                | JoinType::Full
                                        )
                                    {
                                        self.freeze_all()?;

                                        // If join is filtered and there is joined tuples waiting
                                        // to be filtered
                                        if !self
                                            .staging_output_record_batches
                                            .batches
                                            .is_empty()
                                        {
                                            // Apply filter on joined tuples and get filtered batch
                                            let out_filtered_batch =
                                                self.filter_joined_batch()?;

                                            // Append filtered batch to the output buffer
                                            self.output = concat_batches(
                                                &self.schema(),
                                                vec![&self.output, &out_filtered_batch],
                                            )?;

                                            // Send to output if the output buffer surpassed the `batch_size`
                                            if self.output.num_rows() >= self.batch_size {
                                                let record_batch = std::mem::replace(
                                                    &mut self.output,
                                                    RecordBatch::new_empty(
                                                        out_filtered_batch.schema(),
                                                    ),
                                                );
                                                return Poll::Ready(Some(Ok(
                                                    record_batch,
                                                )));
                                            }
                                        }
                                    }

                                    self.streamed_joined = false;
                                    self.streamed_state = StreamedState::Init;
                                }
                            }
                            Ordering::Greater => {
                                if !buffered_exhausted {
                                    self.buffered_joined = false;
                                    self.buffered_state = BufferedState::Init;
                                }
                            }
                        }
                        SortMergeJoinState::Polling
                    };
                }
                SortMergeJoinState::Polling => {
                    if ![StreamedState::Exhausted, StreamedState::Ready]
                        .contains(&self.streamed_state)
                    {
                        match self.poll_streamed_row(cx)? {
                            Poll::Ready(_) => {}
                            Poll::Pending => return Poll::Pending,
                        }
                    }

                    if ![BufferedState::Exhausted, BufferedState::Ready]
                        .contains(&self.buffered_state)
                    {
                        match self.poll_buffered_batches(cx)? {
                            Poll::Ready(_) => {}
                            Poll::Pending => return Poll::Pending,
                        }
                    }
                    let streamed_exhausted =
                        self.streamed_state == StreamedState::Exhausted;
                    let buffered_exhausted =
                        self.buffered_state == BufferedState::Exhausted;
                    if streamed_exhausted && buffered_exhausted {
                        self.state = SortMergeJoinState::Exhausted;
                        continue;
                    }
                    self.current_ordering = self.compare_streamed_buffered()?;
                    self.state = SortMergeJoinState::JoinOutput;
                }
                SortMergeJoinState::JoinOutput => {
                    self.join_partial()?;

                    if self.output_size < self.batch_size {
                        if self.buffered_data.scanning_finished() {
                            self.buffered_data.scanning_reset();
                            self.state = SortMergeJoinState::Init;
                        }
                    } else {
                        self.freeze_all()?;
                        if !self.staging_output_record_batches.batches.is_empty() {
                            let record_batch = self.output_record_batch_and_reset()?;
                            // For non-filtered join output whenever the target output batch size
                            // is hit. For filtered join its needed to output on later phase
                            // because target output batch size can be hit in the middle of
                            // filtering causing the filtering to be incomplete and causing
                            // correctness issues
                            if self.filter.is_some()
                                && matches!(
                                    self.join_type,
                                    JoinType::Left
                                        | JoinType::LeftSemi
                                        | JoinType::Right
                                        | JoinType::LeftAnti
                                        | JoinType::RightAnti
                                        | JoinType::LeftMark
                                        | JoinType::Full
                                )
                            {
                                continue;
                            }

                            return Poll::Ready(Some(Ok(record_batch)));
                        }
                        return Poll::Pending;
                    }
                }
                SortMergeJoinState::Exhausted => {
                    self.freeze_all()?;

                    // if there is still something not processed
                    if !self.staging_output_record_batches.batches.is_empty() {
                        if self.filter.is_some()
                            && matches!(
                                self.join_type,
                                JoinType::Left
                                    | JoinType::LeftSemi
                                    | JoinType::Right
                                    | JoinType::LeftAnti
                                    | JoinType::RightAnti
                                    | JoinType::Full
                                    | JoinType::LeftMark
                            )
                        {
                            let record_batch = self.filter_joined_batch()?;
                            return Poll::Ready(Some(Ok(record_batch)));
                        } else {
                            let record_batch = self.output_record_batch_and_reset()?;
                            return Poll::Ready(Some(Ok(record_batch)));
                        }
                    } else if self.output.num_rows() > 0 {
                        // if processed but still not outputted because it didn't hit batch size before
                        let schema = self.output.schema();
                        let record_batch = std::mem::replace(
                            &mut self.output,
                            RecordBatch::new_empty(schema),
                        );
                        return Poll::Ready(Some(Ok(record_batch)));
                    } else {
                        return Poll::Ready(None);
                    }
                }
            }
        }
    }