in datafusion/physical-plan/src/joins/sort_merge_join.rs [1129:1311]
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let join_time = self.join_metrics.join_time.clone();
let _timer = join_time.timer();
loop {
match &self.state {
SortMergeJoinState::Init => {
let streamed_exhausted =
self.streamed_state == StreamedState::Exhausted;
let buffered_exhausted =
self.buffered_state == BufferedState::Exhausted;
self.state = if streamed_exhausted && buffered_exhausted {
SortMergeJoinState::Exhausted
} else {
match self.current_ordering {
Ordering::Less | Ordering::Equal => {
if !streamed_exhausted {
if self.filter.is_some()
&& matches!(
self.join_type,
JoinType::Left
| JoinType::LeftSemi
| JoinType::LeftMark
| JoinType::Right
| JoinType::LeftAnti
| JoinType::RightAnti
| JoinType::Full
)
{
self.freeze_all()?;
// If join is filtered and there is joined tuples waiting
// to be filtered
if !self
.staging_output_record_batches
.batches
.is_empty()
{
// Apply filter on joined tuples and get filtered batch
let out_filtered_batch =
self.filter_joined_batch()?;
// Append filtered batch to the output buffer
self.output = concat_batches(
&self.schema(),
vec![&self.output, &out_filtered_batch],
)?;
// Send to output if the output buffer surpassed the `batch_size`
if self.output.num_rows() >= self.batch_size {
let record_batch = std::mem::replace(
&mut self.output,
RecordBatch::new_empty(
out_filtered_batch.schema(),
),
);
return Poll::Ready(Some(Ok(
record_batch,
)));
}
}
}
self.streamed_joined = false;
self.streamed_state = StreamedState::Init;
}
}
Ordering::Greater => {
if !buffered_exhausted {
self.buffered_joined = false;
self.buffered_state = BufferedState::Init;
}
}
}
SortMergeJoinState::Polling
};
}
SortMergeJoinState::Polling => {
if ![StreamedState::Exhausted, StreamedState::Ready]
.contains(&self.streamed_state)
{
match self.poll_streamed_row(cx)? {
Poll::Ready(_) => {}
Poll::Pending => return Poll::Pending,
}
}
if ![BufferedState::Exhausted, BufferedState::Ready]
.contains(&self.buffered_state)
{
match self.poll_buffered_batches(cx)? {
Poll::Ready(_) => {}
Poll::Pending => return Poll::Pending,
}
}
let streamed_exhausted =
self.streamed_state == StreamedState::Exhausted;
let buffered_exhausted =
self.buffered_state == BufferedState::Exhausted;
if streamed_exhausted && buffered_exhausted {
self.state = SortMergeJoinState::Exhausted;
continue;
}
self.current_ordering = self.compare_streamed_buffered()?;
self.state = SortMergeJoinState::JoinOutput;
}
SortMergeJoinState::JoinOutput => {
self.join_partial()?;
if self.output_size < self.batch_size {
if self.buffered_data.scanning_finished() {
self.buffered_data.scanning_reset();
self.state = SortMergeJoinState::Init;
}
} else {
self.freeze_all()?;
if !self.staging_output_record_batches.batches.is_empty() {
let record_batch = self.output_record_batch_and_reset()?;
// For non-filtered join output whenever the target output batch size
// is hit. For filtered join its needed to output on later phase
// because target output batch size can be hit in the middle of
// filtering causing the filtering to be incomplete and causing
// correctness issues
if self.filter.is_some()
&& matches!(
self.join_type,
JoinType::Left
| JoinType::LeftSemi
| JoinType::Right
| JoinType::LeftAnti
| JoinType::RightAnti
| JoinType::LeftMark
| JoinType::Full
)
{
continue;
}
return Poll::Ready(Some(Ok(record_batch)));
}
return Poll::Pending;
}
}
SortMergeJoinState::Exhausted => {
self.freeze_all()?;
// if there is still something not processed
if !self.staging_output_record_batches.batches.is_empty() {
if self.filter.is_some()
&& matches!(
self.join_type,
JoinType::Left
| JoinType::LeftSemi
| JoinType::Right
| JoinType::LeftAnti
| JoinType::RightAnti
| JoinType::Full
| JoinType::LeftMark
)
{
let record_batch = self.filter_joined_batch()?;
return Poll::Ready(Some(Ok(record_batch)));
} else {
let record_batch = self.output_record_batch_and_reset()?;
return Poll::Ready(Some(Ok(record_batch)));
}
} else if self.output.num_rows() > 0 {
// if processed but still not outputted because it didn't hit batch size before
let schema = self.output.schema();
let record_batch = std::mem::replace(
&mut self.output,
RecordBatch::new_empty(schema),
);
return Poll::Ready(Some(Ok(record_batch)));
} else {
return Poll::Ready(None);
}
}
}
}
}