fn poll_next_impl()

in datafusion/core/src/physical_plan/joins/hash_join.rs [1238:1377]


    fn poll_next_impl(
        &mut self,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Result<RecordBatch>>> {
        let build_timer = self.join_metrics.build_time.timer();
        let left_data = match ready!(self.left_fut.get(cx)) {
            Ok(left_data) => left_data,
            Err(e) => return Poll::Ready(Some(Err(e))),
        };
        build_timer.done();

        // Reserving memory for visited_left_side bitmap in case it hasn't been initialied yet
        // and join_type requires to store it
        if self.visited_left_side.is_none()
            && need_produce_result_in_final(self.join_type)
        {
            // TODO: Replace `ceil` wrapper with stable `div_cell` after
            // https://github.com/rust-lang/rust/issues/88581
            let visited_bitmap_size = bit_util::ceil(left_data.1.num_rows(), 8);
            self.reservation.try_grow(visited_bitmap_size)?;
            self.join_metrics.build_mem_used.add(visited_bitmap_size);
        }

        let visited_left_side = self.visited_left_side.get_or_insert_with(|| {
            let num_rows = left_data.1.num_rows();
            if need_produce_result_in_final(self.join_type) {
                // these join type need the bitmap to identify which row has be matched or unmatched.
                // For the `left semi` join, need to use the bitmap to produce the matched row in the left side
                // For the `left` join, need to use the bitmap to produce the unmatched row in the left side with null
                // For the `left anti` join, need to use the bitmap to produce the unmatched row in the left side
                // For the `full` join, need to use the bitmap to produce the unmatched row in the left side with null
                let mut buffer = BooleanBufferBuilder::new(num_rows);
                buffer.append_n(num_rows, false);
                buffer
            } else {
                BooleanBufferBuilder::new(0)
            }
        });
        let mut hashes_buffer = vec![];
        self.right
            .poll_next_unpin(cx)
            .map(|maybe_batch| match maybe_batch {
                // one right batch in the join loop
                Some(Ok(batch)) => {
                    self.join_metrics.input_batches.add(1);
                    self.join_metrics.input_rows.add(batch.num_rows());
                    let timer = self.join_metrics.join_time.timer();

                    // get the matched two indices for the on condition
                    let left_right_indices = build_equal_condition_join_indices(
                        &left_data.0,
                        &left_data.1,
                        &batch,
                        &self.on_left,
                        &self.on_right,
                        &self.random_state,
                        self.null_equals_null,
                        &mut hashes_buffer,
                        self.filter.as_ref(),
                        JoinSide::Left,
                    );

                    let result = match left_right_indices {
                        Ok((left_side, right_side)) => {
                            // set the left bitmap
                            // and only left, full, left semi, left anti need the left bitmap
                            if need_produce_result_in_final(self.join_type) {
                                left_side.iter().flatten().for_each(|x| {
                                    visited_left_side.set_bit(x as usize, true);
                                });
                            }

                            // adjust the two side indices base on the join type
                            let (left_side, right_side) = adjust_indices_by_join_type(
                                left_side,
                                right_side,
                                batch.num_rows(),
                                self.join_type,
                            );

                            let result = build_batch_from_indices(
                                &self.schema,
                                &left_data.1,
                                &batch,
                                &left_side,
                                &right_side,
                                &self.column_indices,
                                JoinSide::Left,
                            );
                            self.join_metrics.output_batches.add(1);
                            self.join_metrics.output_rows.add(batch.num_rows());
                            Some(result)
                        }
                        Err(err) => Some(Err(DataFusionError::Execution(format!(
                            "Fail to build join indices in HashJoinExec, error:{err}",
                        )))),
                    };
                    timer.done();
                    result
                }
                None => {
                    let timer = self.join_metrics.join_time.timer();
                    if need_produce_result_in_final(self.join_type) && !self.is_exhausted
                    {
                        // use the global left bitmap to produce the left indices and right indices
                        let (left_side, right_side) = get_final_indices_from_bit_map(
                            visited_left_side,
                            self.join_type,
                        );
                        let empty_right_batch =
                            RecordBatch::new_empty(self.right.schema());
                        // use the left and right indices to produce the batch result
                        let result = build_batch_from_indices(
                            &self.schema,
                            &left_data.1,
                            &empty_right_batch,
                            &left_side,
                            &right_side,
                            &self.column_indices,
                            JoinSide::Left,
                        );

                        if let Ok(ref batch) = result {
                            self.join_metrics.input_batches.add(1);
                            self.join_metrics.input_rows.add(batch.num_rows());

                            self.join_metrics.output_batches.add(1);
                            self.join_metrics.output_rows.add(batch.num_rows());
                        }
                        timer.done();
                        self.is_exhausted = true;
                        Some(result)
                    } else {
                        // end of the join loop
                        None
                    }
                }
                Some(err) => Some(err),
            })
    }