in datafusion/physical-plan/src/joins/sort_merge_join.rs [969:1124]
fn get_corrected_filter_mask(
join_type: JoinType,
row_indices: &UInt64Array,
batch_ids: &[usize],
filter_mask: &BooleanArray,
expected_size: usize,
) -> Option<BooleanArray> {
let row_indices_length = row_indices.len();
let mut corrected_mask: BooleanBuilder =
BooleanBuilder::with_capacity(row_indices_length);
let mut seen_true = false;
match join_type {
JoinType::Left | JoinType::Right => {
for i in 0..row_indices_length {
let last_index =
last_index_for_row(i, row_indices, batch_ids, row_indices_length);
if filter_mask.value(i) {
seen_true = true;
corrected_mask.append_value(true);
} else if seen_true || !filter_mask.value(i) && !last_index {
corrected_mask.append_null(); // to be ignored and not set to output
} else {
corrected_mask.append_value(false); // to be converted to null joined row
}
if last_index {
seen_true = false;
}
}
// Generate null joined rows for records which have no matching join key
corrected_mask.append_n(expected_size - corrected_mask.len(), false);
Some(corrected_mask.finish())
}
JoinType::LeftMark => {
for i in 0..row_indices_length {
let last_index =
last_index_for_row(i, row_indices, batch_ids, row_indices_length);
if filter_mask.value(i) && !seen_true {
seen_true = true;
corrected_mask.append_value(true);
} else if seen_true || !filter_mask.value(i) && !last_index {
corrected_mask.append_null(); // to be ignored and not set to output
} else {
corrected_mask.append_value(false); // to be converted to null joined row
}
if last_index {
seen_true = false;
}
}
// Generate null joined rows for records which have no matching join key
corrected_mask.append_n(expected_size - corrected_mask.len(), false);
Some(corrected_mask.finish())
}
JoinType::LeftSemi => {
for i in 0..row_indices_length {
let last_index =
last_index_for_row(i, row_indices, batch_ids, row_indices_length);
if filter_mask.value(i) && !seen_true {
seen_true = true;
corrected_mask.append_value(true);
} else {
corrected_mask.append_null(); // to be ignored and not set to output
}
if last_index {
seen_true = false;
}
}
Some(corrected_mask.finish())
}
JoinType::LeftAnti | JoinType::RightAnti => {
for i in 0..row_indices_length {
let last_index =
last_index_for_row(i, row_indices, batch_ids, row_indices_length);
if filter_mask.value(i) {
seen_true = true;
}
if last_index {
if !seen_true {
corrected_mask.append_value(true);
} else {
corrected_mask.append_null();
}
seen_true = false;
} else {
corrected_mask.append_null();
}
}
// Generate null joined rows for records which have no matching join key,
// for LeftAnti non-matched considered as true
corrected_mask.append_n(expected_size - corrected_mask.len(), true);
Some(corrected_mask.finish())
}
JoinType::Full => {
let mut mask: Vec<Option<bool>> = vec![Some(true); row_indices_length];
let mut last_true_idx = 0;
let mut first_row_idx = 0;
let mut seen_false = false;
for i in 0..row_indices_length {
let last_index =
last_index_for_row(i, row_indices, batch_ids, row_indices_length);
let val = filter_mask.value(i);
let is_null = filter_mask.is_null(i);
if val {
// memoize the first seen matched row
if !seen_true {
last_true_idx = i;
}
seen_true = true;
}
if is_null || val {
mask[i] = Some(true);
} else if !is_null && !val && (seen_true || seen_false) {
mask[i] = None;
} else {
mask[i] = Some(false);
}
if !is_null && !val {
seen_false = true;
}
if last_index {
// If the left row seen as true its needed to output it once
// To do that we mark all other matches for same row as null to avoid the output
if seen_true {
#[allow(clippy::needless_range_loop)]
for j in first_row_idx..last_true_idx {
mask[j] = None;
}
}
seen_true = false;
seen_false = false;
last_true_idx = 0;
first_row_idx = i + 1;
}
}
Some(BooleanArray::from(mask))
}
// Only outer joins needs to keep track of processed rows and apply corrected filter mask
_ => None,
}
}