in datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs [1276:1426]
fn poll_next_impl(
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
loop {
// Poll the next batch from `input_stream`:
match self.input_stream.poll_next_unpin(cx) {
// Batch is available
Poll::Ready(Some((side, Ok(probe_batch)))) => {
// Determine which stream should be polled next. The side the
// RecordBatch comes from becomes the probe side.
let (
probe_hash_joiner,
build_hash_joiner,
probe_side_sorted_filter_expr,
build_side_sorted_filter_expr,
probe_side_metrics,
) = if side.eq(&JoinSide::Left) {
(
&mut self.left,
&mut self.right,
&mut self.left_sorted_filter_expr,
&mut self.right_sorted_filter_expr,
&mut self.metrics.left,
)
} else {
(
&mut self.right,
&mut self.left,
&mut self.right_sorted_filter_expr,
&mut self.left_sorted_filter_expr,
&mut self.metrics.right,
)
};
// Update the metrics for the stream that was polled:
probe_side_metrics.input_batches.add(1);
probe_side_metrics.input_rows.add(probe_batch.num_rows());
// Update the internal state of the hash joiner for the build side:
probe_hash_joiner
.update_internal_state(&probe_batch, &self.random_state)?;
// Join the two sides:
let equal_result = join_with_probe_batch(
build_hash_joiner,
probe_hash_joiner,
&self.schema,
self.join_type,
self.filter.as_ref(),
&probe_batch,
&self.column_indices,
&self.random_state,
self.null_equals_null,
)?;
// Increment the offset for the probe hash joiner:
probe_hash_joiner.offset += probe_batch.num_rows();
let anti_result = if let (
Some(build_side_sorted_filter_expr),
Some(probe_side_sorted_filter_expr),
Some(graph),
) = (
build_side_sorted_filter_expr.as_mut(),
probe_side_sorted_filter_expr.as_mut(),
self.graph.as_mut(),
) {
// Calculate filter intervals:
calculate_filter_expr_intervals(
&build_hash_joiner.input_buffer,
build_side_sorted_filter_expr,
&probe_batch,
probe_side_sorted_filter_expr,
)?;
let prune_length = build_hash_joiner
.calculate_prune_length_with_probe_batch(
build_side_sorted_filter_expr,
probe_side_sorted_filter_expr,
graph,
)?;
if prune_length > 0 {
let res = build_side_determined_results(
build_hash_joiner,
&self.schema,
prune_length,
probe_batch.schema(),
self.join_type,
&self.column_indices,
)?;
build_hash_joiner.prune_internal_state(prune_length)?;
res
} else {
None
}
} else {
None
};
// Combine results:
let result =
combine_two_batches(&self.schema, equal_result, anti_result)?;
let capacity = self.size();
self.metrics.stream_memory_usage.set(capacity);
self.reservation.lock().try_resize(capacity)?;
// Update the metrics if we have a batch; otherwise, continue the loop.
if let Some(batch) = &result {
self.metrics.output_batches.add(1);
self.metrics.output_rows.add(batch.num_rows());
return Poll::Ready(Ok(result).transpose());
}
}
Poll::Ready(Some((_, Err(e)))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(None) => {
// If the final result has already been obtained, return `Poll::Ready(None)`:
if self.final_result {
return Poll::Ready(None);
}
self.final_result = true;
// Get the left side results:
let left_result = build_side_determined_results(
&self.left,
&self.schema,
self.left.input_buffer.num_rows(),
self.right.input_buffer.schema(),
self.join_type,
&self.column_indices,
)?;
// Get the right side results:
let right_result = build_side_determined_results(
&self.right,
&self.schema,
self.right.input_buffer.num_rows(),
self.left.input_buffer.schema(),
self.join_type,
&self.column_indices,
)?;
// Combine the left and right results:
let result =
combine_two_batches(&self.schema, left_result, right_result)?;
// Update the metrics and return the result:
if let Some(batch) = &result {
// Update the metrics:
self.metrics.output_batches.add(1);
self.metrics.output_rows.add(batch.num_rows());
return Poll::Ready(Ok(result).transpose());
}
}
Poll::Pending => return Poll::Pending,
}
}
}