in crates/iceberg/src/arrow/reader.rs [662:721]
fn get_row_selection_for_filter_predicate(
predicate: &BoundPredicate,
parquet_metadata: &Arc<ParquetMetaData>,
selected_row_groups: &Option<Vec<usize>>,
field_id_map: &HashMap<i32, usize>,
snapshot_schema: &Schema,
) -> Result<RowSelection> {
let Some(column_index) = parquet_metadata.column_index() else {
return Err(Error::new(
ErrorKind::Unexpected,
"Parquet file metadata does not contain a column index",
));
};
let Some(offset_index) = parquet_metadata.offset_index() else {
return Err(Error::new(
ErrorKind::Unexpected,
"Parquet file metadata does not contain an offset index",
));
};
let mut selected_row_groups_idx = 0;
let page_index = column_index
.iter()
.enumerate()
.zip(offset_index)
.zip(parquet_metadata.row_groups());
let mut results = Vec::new();
for (((idx, column_index), offset_index), row_group_metadata) in page_index {
if let Some(selected_row_groups) = selected_row_groups {
// skip row groups that aren't present in selected_row_groups
if idx == selected_row_groups[selected_row_groups_idx] {
selected_row_groups_idx += 1;
} else {
continue;
}
}
let selections_for_page = PageIndexEvaluator::eval(
predicate,
column_index,
offset_index,
row_group_metadata,
field_id_map,
snapshot_schema,
)?;
results.push(selections_for_page);
if let Some(selected_row_groups) = selected_row_groups {
if selected_row_groups_idx == selected_row_groups.len() {
break;
}
}
}
Ok(results.into_iter().flatten().collect::<Vec<_>>().into())
}