fn poll_next_impl_for_build_left()

in datafusion/core/src/physical_plan/joins/nested_loop_join.rs [419:519]


    fn poll_next_impl_for_build_left(
        &mut self,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Result<RecordBatch>>> {
        // all left row
        let build_timer = self.join_metrics.build_time.timer();
        let (left_data, _) = match ready!(self.inner_table.get(cx)) {
            Ok(data) => data,
            Err(e) => return Poll::Ready(Some(Err(e))),
        };
        build_timer.done();

        if self.visited_left_side.is_none() && self.join_type == JoinType::Full {
            // TODO: Replace `ceil` wrapper with stable `div_cell` after
            // https://github.com/rust-lang/rust/issues/88581
            let visited_bitmap_size = bit_util::ceil(left_data.num_rows(), 8);
            self.reservation.try_grow(visited_bitmap_size)?;
            self.join_metrics.build_mem_used.add(visited_bitmap_size);
        }

        // add a bitmap for full join.
        let visited_left_side = self.visited_left_side.get_or_insert_with(|| {
            let left_num_rows = left_data.num_rows();
            // only full join need bitmap
            if self.join_type == JoinType::Full {
                let mut buffer = BooleanBufferBuilder::new(left_num_rows);
                buffer.append_n(left_num_rows, false);
                buffer
            } else {
                BooleanBufferBuilder::new(0)
            }
        });

        self.outer_table
            .poll_next_unpin(cx)
            .map(|maybe_batch| match maybe_batch {
                Some(Ok(right_batch)) => {
                    // Setting up timer & updating input metrics
                    self.join_metrics.input_batches.add(1);
                    self.join_metrics.input_rows.add(right_batch.num_rows());
                    let timer = self.join_metrics.join_time.timer();

                    let result = join_left_and_right_batch(
                        left_data,
                        &right_batch,
                        self.join_type,
                        self.filter.as_ref(),
                        &self.column_indices,
                        &self.schema,
                        visited_left_side,
                    );

                    // Recording time & updating output metrics
                    if let Ok(batch) = &result {
                        timer.done();
                        self.join_metrics.output_batches.add(1);
                        self.join_metrics.output_rows.add(batch.num_rows());
                    }

                    Some(result)
                }
                Some(err) => Some(err),
                None => {
                    if self.join_type == JoinType::Full && !self.is_exhausted {
                        // Only setting up timer, input is exhausted
                        let timer = self.join_metrics.join_time.timer();

                        // use the global left bitmap to produce the left indices and right indices
                        let (left_side, right_side) = get_final_indices_from_bit_map(
                            visited_left_side,
                            self.join_type,
                        );
                        let empty_right_batch =
                            RecordBatch::new_empty(self.outer_table.schema());
                        // use the left and right indices to produce the batch result
                        let result = build_batch_from_indices(
                            &self.schema,
                            left_data,
                            &empty_right_batch,
                            &left_side,
                            &right_side,
                            &self.column_indices,
                            JoinSide::Left,
                        );
                        self.is_exhausted = true;

                        // Recording time & updating output metrics
                        if let Ok(batch) = &result {
                            timer.done();
                            self.join_metrics.output_batches.add(1);
                            self.join_metrics.output_rows.add(batch.num_rows());
                        }

                        Some(result)
                    } else {
                        // end of the join loop
                        None
                    }
                }
            })
    }