fn get_arrow_projection_mask()

in crates/iceberg/src/arrow/reader.rs [490:608]


    fn get_arrow_projection_mask(
        field_ids: &[i32],
        iceberg_schema_of_task: &Schema,
        parquet_schema: &SchemaDescriptor,
        arrow_schema: &ArrowSchemaRef,
    ) -> Result<ProjectionMask> {
        fn type_promotion_is_valid(
            file_type: Option<&PrimitiveType>,
            projected_type: Option<&PrimitiveType>,
        ) -> bool {
            match (file_type, projected_type) {
                (Some(lhs), Some(rhs)) if lhs == rhs => true,
                (Some(PrimitiveType::Int), Some(PrimitiveType::Long)) => true,
                (Some(PrimitiveType::Float), Some(PrimitiveType::Double)) => true,
                (
                    Some(PrimitiveType::Decimal {
                        precision: file_precision,
                        scale: file_scale,
                    }),
                    Some(PrimitiveType::Decimal {
                        precision: requested_precision,
                        scale: requested_scale,
                    }),
                ) if requested_precision >= file_precision && file_scale == requested_scale => true,
                // Uuid will be store as Fixed(16) in parquet file, so the read back type will be Fixed(16).
                (Some(PrimitiveType::Fixed(16)), Some(PrimitiveType::Uuid)) => true,
                _ => false,
            }
        }

        let mut leaf_field_ids = vec![];
        for field_id in field_ids {
            let field = iceberg_schema_of_task.field_by_id(*field_id);
            if let Some(field) = field {
                Self::include_leaf_field_id(field, &mut leaf_field_ids);
            }
        }

        if leaf_field_ids.is_empty() {
            Ok(ProjectionMask::all())
        } else {
            // Build the map between field id and column index in Parquet schema.
            let mut column_map = HashMap::new();

            let fields = arrow_schema.fields();
            // Pre-project only the fields that have been selected, possibly avoiding converting
            // some Arrow types that are not yet supported.
            let mut projected_fields: HashMap<FieldRef, i32> = HashMap::new();
            let projected_arrow_schema = ArrowSchema::new_with_metadata(
                fields.filter_leaves(|_, f| {
                    f.metadata()
                        .get(PARQUET_FIELD_ID_META_KEY)
                        .and_then(|field_id| i32::from_str(field_id).ok())
                        .is_some_and(|field_id| {
                            projected_fields.insert((*f).clone(), field_id);
                            leaf_field_ids.contains(&field_id)
                        })
                }),
                arrow_schema.metadata().clone(),
            );
            let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?;

            fields.filter_leaves(|idx, field| {
                let Some(field_id) = projected_fields.get(field).cloned() else {
                    return false;
                };

                let iceberg_field = iceberg_schema_of_task.field_by_id(field_id);
                let parquet_iceberg_field = iceberg_schema.field_by_id(field_id);

                if iceberg_field.is_none() || parquet_iceberg_field.is_none() {
                    return false;
                }

                if !type_promotion_is_valid(
                    parquet_iceberg_field
                        .unwrap()
                        .field_type
                        .as_primitive_type(),
                    iceberg_field.unwrap().field_type.as_primitive_type(),
                ) {
                    return false;
                }

                column_map.insert(field_id, idx);
                true
            });

            if column_map.len() != leaf_field_ids.len() {
                let missing_fields = leaf_field_ids
                    .iter()
                    .filter(|field_id| !column_map.contains_key(field_id))
                    .collect::<Vec<_>>();
                return Err(Error::new(
                    ErrorKind::DataInvalid,
                    format!(
                        "Parquet schema {} and Iceberg schema {} do not match.",
                        iceberg_schema, iceberg_schema_of_task
                    ),
                )
                .with_context("column_map", format! {"{:?}", column_map})
                .with_context("field_ids", format! {"{:?}", leaf_field_ids})
                .with_context("missing_fields", format! {"{:?}", missing_fields}));
            }

            let mut indices = vec![];
            for field_id in leaf_field_ids {
                if let Some(col_idx) = column_map.get(&field_id) {
                    indices.push(*col_idx);
                } else {
                    return Err(Error::new(
                        ErrorKind::DataInvalid,
                        format!("Field {} is not found in Parquet schema.", field_id),
                    ));
                }
            }
            Ok(ProjectionMask::leaves(parquet_schema, indices))
        }
    }