fn get_corrected_filter_mask()

in datafusion/physical-plan/src/joins/sort_merge_join.rs [969:1124]


fn get_corrected_filter_mask(
    join_type: JoinType,
    row_indices: &UInt64Array,
    batch_ids: &[usize],
    filter_mask: &BooleanArray,
    expected_size: usize,
) -> Option<BooleanArray> {
    let row_indices_length = row_indices.len();
    let mut corrected_mask: BooleanBuilder =
        BooleanBuilder::with_capacity(row_indices_length);
    let mut seen_true = false;

    match join_type {
        JoinType::Left | JoinType::Right => {
            for i in 0..row_indices_length {
                let last_index =
                    last_index_for_row(i, row_indices, batch_ids, row_indices_length);
                if filter_mask.value(i) {
                    seen_true = true;
                    corrected_mask.append_value(true);
                } else if seen_true || !filter_mask.value(i) && !last_index {
                    corrected_mask.append_null(); // to be ignored and not set to output
                } else {
                    corrected_mask.append_value(false); // to be converted to null joined row
                }

                if last_index {
                    seen_true = false;
                }
            }

            // Generate null joined rows for records which have no matching join key
            corrected_mask.append_n(expected_size - corrected_mask.len(), false);
            Some(corrected_mask.finish())
        }
        JoinType::LeftMark => {
            for i in 0..row_indices_length {
                let last_index =
                    last_index_for_row(i, row_indices, batch_ids, row_indices_length);
                if filter_mask.value(i) && !seen_true {
                    seen_true = true;
                    corrected_mask.append_value(true);
                } else if seen_true || !filter_mask.value(i) && !last_index {
                    corrected_mask.append_null(); // to be ignored and not set to output
                } else {
                    corrected_mask.append_value(false); // to be converted to null joined row
                }

                if last_index {
                    seen_true = false;
                }
            }

            // Generate null joined rows for records which have no matching join key
            corrected_mask.append_n(expected_size - corrected_mask.len(), false);
            Some(corrected_mask.finish())
        }
        JoinType::LeftSemi => {
            for i in 0..row_indices_length {
                let last_index =
                    last_index_for_row(i, row_indices, batch_ids, row_indices_length);
                if filter_mask.value(i) && !seen_true {
                    seen_true = true;
                    corrected_mask.append_value(true);
                } else {
                    corrected_mask.append_null(); // to be ignored and not set to output
                }

                if last_index {
                    seen_true = false;
                }
            }

            Some(corrected_mask.finish())
        }
        JoinType::LeftAnti | JoinType::RightAnti => {
            for i in 0..row_indices_length {
                let last_index =
                    last_index_for_row(i, row_indices, batch_ids, row_indices_length);

                if filter_mask.value(i) {
                    seen_true = true;
                }

                if last_index {
                    if !seen_true {
                        corrected_mask.append_value(true);
                    } else {
                        corrected_mask.append_null();
                    }

                    seen_true = false;
                } else {
                    corrected_mask.append_null();
                }
            }
            // Generate null joined rows for records which have no matching join key,
            // for LeftAnti non-matched considered as true
            corrected_mask.append_n(expected_size - corrected_mask.len(), true);
            Some(corrected_mask.finish())
        }
        JoinType::Full => {
            let mut mask: Vec<Option<bool>> = vec![Some(true); row_indices_length];
            let mut last_true_idx = 0;
            let mut first_row_idx = 0;
            let mut seen_false = false;

            for i in 0..row_indices_length {
                let last_index =
                    last_index_for_row(i, row_indices, batch_ids, row_indices_length);
                let val = filter_mask.value(i);
                let is_null = filter_mask.is_null(i);

                if val {
                    // memoize the first seen matched row
                    if !seen_true {
                        last_true_idx = i;
                    }
                    seen_true = true;
                }

                if is_null || val {
                    mask[i] = Some(true);
                } else if !is_null && !val && (seen_true || seen_false) {
                    mask[i] = None;
                } else {
                    mask[i] = Some(false);
                }

                if !is_null && !val {
                    seen_false = true;
                }

                if last_index {
                    // If the left row seen as true its needed to output it once
                    // To do that we mark all other matches for same row as null to avoid the output
                    if seen_true {
                        #[allow(clippy::needless_range_loop)]
                        for j in first_row_idx..last_true_idx {
                            mask[j] = None;
                        }
                    }

                    seen_true = false;
                    seen_false = false;
                    last_true_idx = 0;
                    first_row_idx = i + 1;
                }
            }

            Some(BooleanArray::from(mask))
        }
        // Only outer joins needs to keep track of processed rows and apply corrected filter mask
        _ => None,
    }
}