in crates/iceberg/src/arrow/reader.rs [490:608]
fn get_arrow_projection_mask(
field_ids: &[i32],
iceberg_schema_of_task: &Schema,
parquet_schema: &SchemaDescriptor,
arrow_schema: &ArrowSchemaRef,
) -> Result<ProjectionMask> {
fn type_promotion_is_valid(
file_type: Option<&PrimitiveType>,
projected_type: Option<&PrimitiveType>,
) -> bool {
match (file_type, projected_type) {
(Some(lhs), Some(rhs)) if lhs == rhs => true,
(Some(PrimitiveType::Int), Some(PrimitiveType::Long)) => true,
(Some(PrimitiveType::Float), Some(PrimitiveType::Double)) => true,
(
Some(PrimitiveType::Decimal {
precision: file_precision,
scale: file_scale,
}),
Some(PrimitiveType::Decimal {
precision: requested_precision,
scale: requested_scale,
}),
) if requested_precision >= file_precision && file_scale == requested_scale => true,
// Uuid will be store as Fixed(16) in parquet file, so the read back type will be Fixed(16).
(Some(PrimitiveType::Fixed(16)), Some(PrimitiveType::Uuid)) => true,
_ => false,
}
}
let mut leaf_field_ids = vec![];
for field_id in field_ids {
let field = iceberg_schema_of_task.field_by_id(*field_id);
if let Some(field) = field {
Self::include_leaf_field_id(field, &mut leaf_field_ids);
}
}
if leaf_field_ids.is_empty() {
Ok(ProjectionMask::all())
} else {
// Build the map between field id and column index in Parquet schema.
let mut column_map = HashMap::new();
let fields = arrow_schema.fields();
// Pre-project only the fields that have been selected, possibly avoiding converting
// some Arrow types that are not yet supported.
let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
let projected_arrow_schema = ArrowSchema::new_with_metadata(
fields.filter_leaves(|_, f| {
f.metadata()
.get(PARQUET_FIELD_ID_META_KEY)
.and_then(|field_id| i32::from_str(field_id).ok())
.is_some_and(|field_id| {
projected_fields.insert((*f).clone(), field_id);
leaf_field_ids.contains(&field_id)
})
}),
arrow_schema.metadata().clone(),
);
let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;
fields.filter_leaves(|idx, field| {
let Some(field_id) = projected_fields.get(field).cloned() else {
return false;
};
let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);
if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
return false;
}
if !type_promotion_is_valid(
parquet_iceberg_field
.unwrap()
.field_type
.as_primitive_type(),
iceberg_field.unwrap().field_type.as_primitive_type(),
) {
return false;
}
column_map.insert(field_id, idx);
true
});
if column_map.len() != leaf_field_ids.len() {
let missing_fields = leaf_field_ids
.iter()
.filter(|field_id| !column_map.contains_key(field_id))
.collect::<Vec<_>>();
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Parquet schema {} and Iceberg schema {} do not match.",
iceberg_schema, iceberg_schema_of_task
),
)
.with_context("column_map", format! {"{:?}", column_map})
.with_context("field_ids", format! {"{:?}", leaf_field_ids})
.with_context("missing_fields", format! {"{:?}", missing_fields}));
}
let mut indices = vec![];
for field_id in leaf_field_ids {
if let Some(col_idx) = column_map.get(&field_id) {
indices.push(*col_idx);
} else {
return Err(Error::new(
ErrorKind::DataInvalid,
format!("Field {} is not found in Parquet schema.", field_id),
));
}
}
Ok(ProjectionMask::leaves(parquet_schema, indices))
}
}