in datafusion/core/src/physical_plan/joins/nested_loop_join.rs [419:519]
fn poll_next_impl_for_build_left(
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
// all left row
let build_timer = self.join_metrics.build_time.timer();
let (left_data, _) = match ready!(self.inner_table.get(cx)) {
Ok(data) => data,
Err(e) => return Poll::Ready(Some(Err(e))),
};
build_timer.done();
if self.visited_left_side.is_none() && self.join_type == JoinType::Full {
// TODO: Replace `ceil` wrapper with stable `div_cell` after
// https://github.com/rust-lang/rust/issues/88581
let visited_bitmap_size = bit_util::ceil(left_data.num_rows(), 8);
self.reservation.try_grow(visited_bitmap_size)?;
self.join_metrics.build_mem_used.add(visited_bitmap_size);
}
// add a bitmap for full join.
let visited_left_side = self.visited_left_side.get_or_insert_with(|| {
let left_num_rows = left_data.num_rows();
// only full join need bitmap
if self.join_type == JoinType::Full {
let mut buffer = BooleanBufferBuilder::new(left_num_rows);
buffer.append_n(left_num_rows, false);
buffer
} else {
BooleanBufferBuilder::new(0)
}
});
self.outer_table
.poll_next_unpin(cx)
.map(|maybe_batch| match maybe_batch {
Some(Ok(right_batch)) => {
// Setting up timer & updating input metrics
self.join_metrics.input_batches.add(1);
self.join_metrics.input_rows.add(right_batch.num_rows());
let timer = self.join_metrics.join_time.timer();
let result = join_left_and_right_batch(
left_data,
&right_batch,
self.join_type,
self.filter.as_ref(),
&self.column_indices,
&self.schema,
visited_left_side,
);
// Recording time & updating output metrics
if let Ok(batch) = &result {
timer.done();
self.join_metrics.output_batches.add(1);
self.join_metrics.output_rows.add(batch.num_rows());
}
Some(result)
}
Some(err) => Some(err),
None => {
if self.join_type == JoinType::Full && !self.is_exhausted {
// Only setting up timer, input is exhausted
let timer = self.join_metrics.join_time.timer();
// use the global left bitmap to produce the left indices and right indices
let (left_side, right_side) = get_final_indices_from_bit_map(
visited_left_side,
self.join_type,
);
let empty_right_batch =
RecordBatch::new_empty(self.outer_table.schema());
// use the left and right indices to produce the batch result
let result = build_batch_from_indices(
&self.schema,
left_data,
&empty_right_batch,
&left_side,
&right_side,
&self.column_indices,
JoinSide::Left,
);
self.is_exhausted = true;
// Recording time & updating output metrics
if let Ok(batch) = &result {
timer.done();
self.join_metrics.output_batches.add(1);
self.join_metrics.output_rows.add(batch.num_rows());
}
Some(result)
} else {
// end of the join loop
None
}
}
})
}