fn poll_next_impl()

in datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs [1276:1426]


    fn poll_next_impl(
        &mut self,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Result<RecordBatch>>> {
        loop {
            // Poll the next batch from `input_stream`:
            match self.input_stream.poll_next_unpin(cx) {
                // Batch is available
                Poll::Ready(Some((side, Ok(probe_batch)))) => {
                    // Determine which stream should be polled next. The side the
                    // RecordBatch comes from becomes the probe side.
                    let (
                        probe_hash_joiner,
                        build_hash_joiner,
                        probe_side_sorted_filter_expr,
                        build_side_sorted_filter_expr,
                        probe_side_metrics,
                    ) = if side.eq(&JoinSide::Left) {
                        (
                            &mut self.left,
                            &mut self.right,
                            &mut self.left_sorted_filter_expr,
                            &mut self.right_sorted_filter_expr,
                            &mut self.metrics.left,
                        )
                    } else {
                        (
                            &mut self.right,
                            &mut self.left,
                            &mut self.right_sorted_filter_expr,
                            &mut self.left_sorted_filter_expr,
                            &mut self.metrics.right,
                        )
                    };
                    // Update the metrics for the stream that was polled:
                    probe_side_metrics.input_batches.add(1);
                    probe_side_metrics.input_rows.add(probe_batch.num_rows());
                    // Update the internal state of the hash joiner for the build side:
                    probe_hash_joiner
                        .update_internal_state(&probe_batch, &self.random_state)?;
                    // Join the two sides:
                    let equal_result = join_with_probe_batch(
                        build_hash_joiner,
                        probe_hash_joiner,
                        &self.schema,
                        self.join_type,
                        self.filter.as_ref(),
                        &probe_batch,
                        &self.column_indices,
                        &self.random_state,
                        self.null_equals_null,
                    )?;
                    // Increment the offset for the probe hash joiner:
                    probe_hash_joiner.offset += probe_batch.num_rows();

                    let anti_result = if let (
                        Some(build_side_sorted_filter_expr),
                        Some(probe_side_sorted_filter_expr),
                        Some(graph),
                    ) = (
                        build_side_sorted_filter_expr.as_mut(),
                        probe_side_sorted_filter_expr.as_mut(),
                        self.graph.as_mut(),
                    ) {
                        // Calculate filter intervals:
                        calculate_filter_expr_intervals(
                            &build_hash_joiner.input_buffer,
                            build_side_sorted_filter_expr,
                            &probe_batch,
                            probe_side_sorted_filter_expr,
                        )?;
                        let prune_length = build_hash_joiner
                            .calculate_prune_length_with_probe_batch(
                                build_side_sorted_filter_expr,
                                probe_side_sorted_filter_expr,
                                graph,
                            )?;

                        if prune_length > 0 {
                            let res = build_side_determined_results(
                                build_hash_joiner,
                                &self.schema,
                                prune_length,
                                probe_batch.schema(),
                                self.join_type,
                                &self.column_indices,
                            )?;
                            build_hash_joiner.prune_internal_state(prune_length)?;
                            res
                        } else {
                            None
                        }
                    } else {
                        None
                    };

                    // Combine results:
                    let result =
                        combine_two_batches(&self.schema, equal_result, anti_result)?;
                    let capacity = self.size();
                    self.metrics.stream_memory_usage.set(capacity);
                    self.reservation.lock().try_resize(capacity)?;
                    // Update the metrics if we have a batch; otherwise, continue the loop.
                    if let Some(batch) = &result {
                        self.metrics.output_batches.add(1);
                        self.metrics.output_rows.add(batch.num_rows());
                        return Poll::Ready(Ok(result).transpose());
                    }
                }
                Poll::Ready(Some((_, Err(e)))) => return Poll::Ready(Some(Err(e))),
                Poll::Ready(None) => {
                    // If the final result has already been obtained, return `Poll::Ready(None)`:
                    if self.final_result {
                        return Poll::Ready(None);
                    }
                    self.final_result = true;
                    // Get the left side results:
                    let left_result = build_side_determined_results(
                        &self.left,
                        &self.schema,
                        self.left.input_buffer.num_rows(),
                        self.right.input_buffer.schema(),
                        self.join_type,
                        &self.column_indices,
                    )?;
                    // Get the right side results:
                    let right_result = build_side_determined_results(
                        &self.right,
                        &self.schema,
                        self.right.input_buffer.num_rows(),
                        self.left.input_buffer.schema(),
                        self.join_type,
                        &self.column_indices,
                    )?;

                    // Combine the left and right results:
                    let result =
                        combine_two_batches(&self.schema, left_result, right_result)?;

                    // Update the metrics and return the result:
                    if let Some(batch) = &result {
                        // Update the metrics:
                        self.metrics.output_batches.add(1);
                        self.metrics.output_rows.add(batch.num_rows());
                        return Poll::Ready(Ok(result).transpose());
                    }
                }
                Poll::Pending => return Poll::Pending,
            }
        }
    }