in crates/iceberg/src/arrow/reader.rs [347:450]
fn build_deletes_row_selection(
row_group_metadata_list: &[RowGroupMetaData],
selected_row_groups: &Option<Vec<usize>>,
positional_deletes: &DeleteVector,
) -> Result<RowSelection> {
let mut results: Vec<RowSelector> = Vec::new();
let mut selected_row_groups_idx = 0;
let mut current_row_group_base_idx: u64 = 0;
let mut delete_vector_iter = positional_deletes.iter();
let mut next_deleted_row_idx_opt = delete_vector_iter.next();
for (idx, row_group_metadata) in row_group_metadata_list.iter().enumerate() {
let row_group_num_rows = row_group_metadata.num_rows() as u64;
let next_row_group_base_idx = current_row_group_base_idx + row_group_num_rows;
// if row group selection is enabled,
if let Some(selected_row_groups) = selected_row_groups {
// if we've consumed all the selected row groups, we're done
if selected_row_groups_idx == selected_row_groups.len() {
break;
}
if idx == selected_row_groups[selected_row_groups_idx] {
// we're in a selected row group. Increment selected_row_groups_idx
// so that next time around the for loop we're looking for the next
// selected row group
selected_row_groups_idx += 1;
} else {
// remove any positional deletes from the skipped page so that
// `positional.deletes.min()` can be used
delete_vector_iter.advance_to(next_row_group_base_idx);
next_deleted_row_idx_opt = delete_vector_iter.next();
// still increment the current page base index but then skip to the next row group
// in the file
current_row_group_base_idx += row_group_num_rows;
continue;
}
}
let mut next_deleted_row_idx = match next_deleted_row_idx_opt {
Some(next_deleted_row_idx) => {
// if the index of the next deleted row is beyond this row group, add a selection for
// the remainder of this row group and skip to the next row group
if next_deleted_row_idx >= next_row_group_base_idx {
results.push(RowSelector::select(row_group_num_rows as usize));
continue;
}
next_deleted_row_idx
}
// If there are no more pos deletes, add a selector for the entirety of this row group.
_ => {
results.push(RowSelector::select(row_group_num_rows as usize));
continue;
}
};
let mut current_idx = current_row_group_base_idx;
'chunks: while next_deleted_row_idx < next_row_group_base_idx {
// `select` all rows that precede the next delete index
if current_idx < next_deleted_row_idx {
let run_length = next_deleted_row_idx - current_idx;
results.push(RowSelector::select(run_length as usize));
current_idx += run_length;
}
// `skip` all consecutive deleted rows in the current row group
let mut run_length = 0;
while next_deleted_row_idx == current_idx
&& next_deleted_row_idx < next_row_group_base_idx
{
run_length += 1;
current_idx += 1;
next_deleted_row_idx_opt = delete_vector_iter.next();
next_deleted_row_idx = match next_deleted_row_idx_opt {
Some(next_deleted_row_idx) => next_deleted_row_idx,
_ => {
// We've processed the final positional delete.
// Conclude the skip and then break so that we select the remaining
// rows in the row group and move on to the next row group
results.push(RowSelector::skip(run_length));
break 'chunks;
}
};
}
if run_length > 0 {
results.push(RowSelector::skip(run_length));
}
}
if current_idx < next_row_group_base_idx {
results.push(RowSelector::select(
(next_row_group_base_idx - current_idx) as usize,
));
}
current_row_group_base_idx += row_group_num_rows;
}
Ok(results.into())
}