in datafusion/core/src/physical_plan/joins/hash_join.rs [1238:1377]
fn poll_next_impl(
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
let build_timer = self.join_metrics.build_time.timer();
let left_data = match ready!(self.left_fut.get(cx)) {
Ok(left_data) => left_data,
Err(e) => return Poll::Ready(Some(Err(e))),
};
build_timer.done();
// Reserving memory for visited_left_side bitmap in case it hasn't been initialied yet
// and join_type requires to store it
if self.visited_left_side.is_none()
&& need_produce_result_in_final(self.join_type)
{
// TODO: Replace `ceil` wrapper with stable `div_cell` after
// https://github.com/rust-lang/rust/issues/88581
let visited_bitmap_size = bit_util::ceil(left_data.1.num_rows(), 8);
self.reservation.try_grow(visited_bitmap_size)?;
self.join_metrics.build_mem_used.add(visited_bitmap_size);
}
let visited_left_side = self.visited_left_side.get_or_insert_with(|| {
let num_rows = left_data.1.num_rows();
if need_produce_result_in_final(self.join_type) {
// these join type need the bitmap to identify which row has be matched or unmatched.
// For the `left semi` join, need to use the bitmap to produce the matched row in the left side
// For the `left` join, need to use the bitmap to produce the unmatched row in the left side with null
// For the `left anti` join, need to use the bitmap to produce the unmatched row in the left side
// For the `full` join, need to use the bitmap to produce the unmatched row in the left side with null
let mut buffer = BooleanBufferBuilder::new(num_rows);
buffer.append_n(num_rows, false);
buffer
} else {
BooleanBufferBuilder::new(0)
}
});
let mut hashes_buffer = vec![];
self.right
.poll_next_unpin(cx)
.map(|maybe_batch| match maybe_batch {
// one right batch in the join loop
Some(Ok(batch)) => {
self.join_metrics.input_batches.add(1);
self.join_metrics.input_rows.add(batch.num_rows());
let timer = self.join_metrics.join_time.timer();
// get the matched two indices for the on condition
let left_right_indices = build_equal_condition_join_indices(
&left_data.0,
&left_data.1,
&batch,
&self.on_left,
&self.on_right,
&self.random_state,
self.null_equals_null,
&mut hashes_buffer,
self.filter.as_ref(),
JoinSide::Left,
);
let result = match left_right_indices {
Ok((left_side, right_side)) => {
// set the left bitmap
// and only left, full, left semi, left anti need the left bitmap
if need_produce_result_in_final(self.join_type) {
left_side.iter().flatten().for_each(|x| {
visited_left_side.set_bit(x as usize, true);
});
}
// adjust the two side indices base on the join type
let (left_side, right_side) = adjust_indices_by_join_type(
left_side,
right_side,
batch.num_rows(),
self.join_type,
);
let result = build_batch_from_indices(
&self.schema,
&left_data.1,
&batch,
&left_side,
&right_side,
&self.column_indices,
JoinSide::Left,
);
self.join_metrics.output_batches.add(1);
self.join_metrics.output_rows.add(batch.num_rows());
Some(result)
}
Err(err) => Some(Err(DataFusionError::Execution(format!(
"Fail to build join indices in HashJoinExec, error:{err}",
)))),
};
timer.done();
result
}
None => {
let timer = self.join_metrics.join_time.timer();
if need_produce_result_in_final(self.join_type) && !self.is_exhausted
{
// use the global left bitmap to produce the left indices and right indices
let (left_side, right_side) = get_final_indices_from_bit_map(
visited_left_side,
self.join_type,
);
let empty_right_batch =
RecordBatch::new_empty(self.right.schema());
// use the left and right indices to produce the batch result
let result = build_batch_from_indices(
&self.schema,
&left_data.1,
&empty_right_batch,
&left_side,
&right_side,
&self.column_indices,
JoinSide::Left,
);
if let Ok(ref batch) = result {
self.join_metrics.input_batches.add(1);
self.join_metrics.input_rows.add(batch.num_rows());
self.join_metrics.output_batches.add(1);
self.join_metrics.output_rows.add(batch.num_rows());
}
timer.done();
self.is_exhausted = true;
Some(result)
} else {
// end of the join loop
None
}
}
Some(err) => Some(err),
})
}