fn try_from()

in crates/iceberg/src/spec/table_metadata.rs [917:1068]


        fn try_from(value: TableMetadataV1) -> Result<Self, Error> {
            let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
                None
            } else {
                value.current_snapshot_id
            };

            let (schemas, current_schema_id, current_schema) =
                if let (Some(schemas_vec), Some(schema_id)) =
                    (&value.schemas, value.current_schema_id)
                {
                    // Option 1: Use 'schemas' + 'current_schema_id'
                    let schema_map = HashMap::from_iter(
                        schemas_vec
                            .clone()
                            .into_iter()
                            .map(|schema| {
                                let schema: Schema = schema.try_into()?;
                                Ok((schema.schema_id(), Arc::new(schema)))
                            })
                            .collect::<Result<Vec<_>, Error>>()?,
                    );

                    let schema = schema_map
                        .get(&schema_id)
                        .ok_or_else(|| {
                            Error::new(
                                ErrorKind::DataInvalid,
                                format!(
                                    "No schema exists with the current schema id {}.",
                                    schema_id
                                ),
                            )
                        })?
                        .clone();
                    (schema_map, schema_id, schema)
                } else if let Some(schema) = value.schema {
                    // Option 2: Fall back to `schema`
                    let schema: Schema = schema.try_into()?;
                    let schema_id = schema.schema_id();
                    let schema_arc = Arc::new(schema);
                    let schema_map = HashMap::from_iter(vec![(schema_id, schema_arc.clone())]);
                    (schema_map, schema_id, schema_arc)
                } else {
                    // Option 3: No valid schema configuration found
                    return Err(Error::new(
                        ErrorKind::DataInvalid,
                        "No valid schema configuration found in table metadata",
                    ));
                };

            // Prioritize 'partition_specs' over 'partition_spec'
            let partition_specs = if let Some(specs_vec) = value.partition_specs {
                // Option 1: Use 'partition_specs'
                specs_vec
                    .into_iter()
                    .map(|x| (x.spec_id(), Arc::new(x)))
                    .collect::<HashMap<_, _>>()
            } else if let Some(partition_spec) = value.partition_spec {
                // Option 2: Fall back to 'partition_spec'
                let spec = PartitionSpec::builder(current_schema.clone())
                    .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
                    .add_unbound_fields(partition_spec.into_iter().map(|f| f.into_unbound()))?
                    .build()?;

                HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
            } else {
                // Option 3: Create empty partition spec
                let spec = PartitionSpec::builder(current_schema.clone())
                    .with_spec_id(DEFAULT_PARTITION_SPEC_ID)
                    .build()?;

                HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
            };

            // Get the default_spec_id, prioritizing the explicit value if provided
            let default_spec_id = value
                .default_spec_id
                .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default());

            // Get the default spec
            let default_spec: PartitionSpecRef = partition_specs
                .get(&default_spec_id)
                .map(|x| Arc::unwrap_or_clone(x.clone()))
                .ok_or_else(|| {
                    Error::new(
                        ErrorKind::DataInvalid,
                        format!("Default partition spec {default_spec_id} not found"),
                    )
                })?
                .into();
            let default_partition_type = default_spec.partition_type(&current_schema)?;

            let mut metadata = TableMetadata {
                format_version: FormatVersion::V1,
                table_uuid: value.table_uuid.unwrap_or_default(),
                location: value.location,
                last_sequence_number: 0,
                last_updated_ms: value.last_updated_ms,
                last_column_id: value.last_column_id,
                current_schema_id,
                default_spec,
                default_partition_type,
                last_partition_id: value
                    .last_partition_id
                    .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()),
                partition_specs,
                schemas,
                properties: value.properties.unwrap_or_default(),
                current_snapshot_id,
                snapshots: value
                    .snapshots
                    .map(|snapshots| {
                        Ok::<_, Error>(HashMap::from_iter(
                            snapshots
                                .into_iter()
                                .map(|x| Ok((x.snapshot_id, Arc::new(x.try_into()?))))
                                .collect::<Result<Vec<_>, Error>>()?,
                        ))
                    })
                    .transpose()?
                    .unwrap_or_default(),
                snapshot_log: value.snapshot_log.unwrap_or_default(),
                metadata_log: value.metadata_log.unwrap_or_default(),
                sort_orders: match value.sort_orders {
                    Some(sort_orders) => HashMap::from_iter(
                        sort_orders.into_iter().map(|x| (x.order_id, Arc::new(x))),
                    ),
                    None => HashMap::new(),
                },
                default_sort_order_id: value
                    .default_sort_order_id
                    .unwrap_or(SortOrder::UNSORTED_ORDER_ID),
                refs: if let Some(snapshot_id) = current_snapshot_id {
                    HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
                        snapshot_id,
                        retention: SnapshotRetention::Branch {
                            min_snapshots_to_keep: None,
                            max_snapshot_age_ms: None,
                            max_ref_age_ms: None,
                        },
                    })])
                } else {
                    HashMap::new()
                },
                statistics: index_statistics(value.statistics),
                partition_statistics: index_partition_statistics(value.partition_statistics),
            };

            metadata.borrow_mut().try_normalize()?;
            Ok(metadata)
        }