in datafusion/physical-plan/src/joins/sort_merge_join.rs [1592:1707]
fn join_partial(&mut self) -> Result<()> {
// Whether to join streamed rows
let mut join_streamed = false;
// Whether to join buffered rows
let mut join_buffered = false;
// For Mark join we store a dummy id to indicate the the row has a match
let mut mark_row_as_match = false;
// determine whether we need to join streamed/buffered rows
match self.current_ordering {
Ordering::Less => {
if matches!(
self.join_type,
JoinType::Left
| JoinType::Right
| JoinType::RightSemi
| JoinType::Full
| JoinType::LeftAnti
| JoinType::RightAnti
| JoinType::LeftMark
) {
join_streamed = !self.streamed_joined;
}
}
Ordering::Equal => {
if matches!(self.join_type, JoinType::LeftSemi | JoinType::LeftMark) {
mark_row_as_match = matches!(self.join_type, JoinType::LeftMark);
// if the join filter is specified then its needed to output the streamed index
// only if it has not been emitted before
// the `join_filter_matched_idxs` keeps track on if streamed index has a successful
// filter match and prevents the same index to go into output more than once
if self.filter.is_some() {
join_streamed = !self
.streamed_batch
.join_filter_matched_idxs
.contains(&(self.streamed_batch.idx as u64))
&& !self.streamed_joined;
// if the join filter specified there can be references to buffered columns
// so buffered columns are needed to access them
join_buffered = join_streamed;
} else {
join_streamed = !self.streamed_joined;
}
}
if matches!(
self.join_type,
JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full
) {
join_streamed = true;
join_buffered = true;
};
if matches!(self.join_type, JoinType::LeftAnti | JoinType::RightAnti)
&& self.filter.is_some()
{
join_streamed = !self.streamed_joined;
join_buffered = join_streamed;
}
}
Ordering::Greater => {
if matches!(self.join_type, JoinType::Full) {
join_buffered = !self.buffered_joined;
};
}
}
if !join_streamed && !join_buffered {
// no joined data
self.buffered_data.scanning_finish();
return Ok(());
}
if join_buffered {
// joining streamed/nulls and buffered
while !self.buffered_data.scanning_finished()
&& self.output_size < self.batch_size
{
let scanning_idx = self.buffered_data.scanning_idx();
if join_streamed {
// Join streamed row and buffered row
self.streamed_batch.append_output_pair(
Some(self.buffered_data.scanning_batch_idx),
Some(scanning_idx),
);
} else {
// Join nulls and buffered row for FULL join
self.buffered_data
.scanning_batch_mut()
.null_joined
.push(scanning_idx);
}
self.output_size += 1;
self.buffered_data.scanning_advance();
if self.buffered_data.scanning_finished() {
self.streamed_joined = join_streamed;
self.buffered_joined = true;
}
}
} else {
// joining streamed and nulls
let scanning_batch_idx = if self.buffered_data.scanning_finished() {
None
} else {
Some(self.buffered_data.scanning_batch_idx)
};
// For Mark join we store a dummy id to indicate the the row has a match
let scanning_idx = mark_row_as_match.then_some(0);
self.streamed_batch
.append_output_pair(scanning_batch_idx, scanning_idx);
self.output_size += 1;
self.buffered_data.scanning_finish();
self.streamed_joined = true;
}
Ok(())
}