in crates/core/src/file_group/reader.rs [88:144]
fn create_filtering_mask_for_base_file_records(
&self,
records: &RecordBatch,
) -> Result<Option<BooleanArray>> {
let populates_meta_fields = self
.hudi_configs
.get_or_default(HudiTableConfig::PopulatesMetaFields)
.to::<bool>();
if !populates_meta_fields {
// If meta fields are not populated, commit time filtering is not applicable.
return Ok(None);
}
let mut and_filters: Vec<SchemableFilter> = Vec::new();
let schema = MetaField::schema();
if let Some(start) = self
.hudi_configs
.try_get(HudiReadConfig::FileGroupStartTimestamp)
.map(|v| v.to::<String>())
{
let filter: Filter =
Filter::try_from((MetaField::CommitTime.as_ref(), ">", start.as_str()))?;
let filter = SchemableFilter::try_from((filter, schema.as_ref()))?;
and_filters.push(filter);
} else {
// If start timestamp is not provided, the query is snapshot or time-travel, so
// commit time filtering is not needed as the base file being read is already
// filtered and selected by the timeline.
return Ok(None);
}
if let Some(end) = self
.hudi_configs
.try_get(HudiReadConfig::FileGroupEndTimestamp)
.map(|v| v.to::<String>())
{
let filter = Filter::try_from((MetaField::CommitTime.as_ref(), "<=", end.as_str()))?;
let filter = SchemableFilter::try_from((filter, schema.as_ref()))?;
and_filters.push(filter);
}
if and_filters.is_empty() {
return Ok(None);
}
let mut mask = BooleanArray::from(vec![true; records.num_rows()]);
for filter in &and_filters {
let col_name = filter.field.name().as_str();
let col_values = records
.column_by_name(col_name)
.ok_or_else(|| ReadFileSliceError(format!("Column {col_name} not found")))?;
let comparison = filter.apply_comparsion(col_values)?;
mask = and(&mask, &comparison)?;
}
Ok(Some(mask))
}