in crates/iceberg/src/spec/table_metadata.rs [917:1068]
fn try_from(value: TableMetadataV1) -> Result<Self, Error> {
let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id {
None
} else {
value.current_snapshot_id
};
let (schemas, current_schema_id, current_schema) =
if let (Some(schemas_vec), Some(schema_id)) =
(&value.schemas, value.current_schema_id)
{
// Option 1: Use 'schemas' + 'current_schema_id'
let schema_map = HashMap::from_iter(
schemas_vec
.clone()
.into_iter()
.map(|schema| {
let schema: Schema = schema.try_into()?;
Ok((schema.schema_id(), Arc::new(schema)))
})
.collect::<Result<Vec<_>, Error>>()?,
);
let schema = schema_map
.get(&schema_id)
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!(
"No schema exists with the current schema id {}.",
schema_id
),
)
})?
.clone();
(schema_map, schema_id, schema)
} else if let Some(schema) = value.schema {
// Option 2: Fall back to `schema`
let schema: Schema = schema.try_into()?;
let schema_id = schema.schema_id();
let schema_arc = Arc::new(schema);
let schema_map = HashMap::from_iter(vec![(schema_id, schema_arc.clone())]);
(schema_map, schema_id, schema_arc)
} else {
// Option 3: No valid schema configuration found
return Err(Error::new(
ErrorKind::DataInvalid,
"No valid schema configuration found in table metadata",
));
};
// Prioritize 'partition_specs' over 'partition_spec'
let partition_specs = if let Some(specs_vec) = value.partition_specs {
// Option 1: Use 'partition_specs'
specs_vec
.into_iter()
.map(|x| (x.spec_id(), Arc::new(x)))
.collect::<HashMap<_, _>>()
} else if let Some(partition_spec) = value.partition_spec {
// Option 2: Fall back to 'partition_spec'
let spec = PartitionSpec::builder(current_schema.clone())
.with_spec_id(DEFAULT_PARTITION_SPEC_ID)
.add_unbound_fields(partition_spec.into_iter().map(|f| f.into_unbound()))?
.build()?;
HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
} else {
// Option 3: Create empty partition spec
let spec = PartitionSpec::builder(current_schema.clone())
.with_spec_id(DEFAULT_PARTITION_SPEC_ID)
.build()?;
HashMap::from_iter(vec![(DEFAULT_PARTITION_SPEC_ID, Arc::new(spec))])
};
// Get the default_spec_id, prioritizing the explicit value if provided
let default_spec_id = value
.default_spec_id
.unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default());
// Get the default spec
let default_spec: PartitionSpecRef = partition_specs
.get(&default_spec_id)
.map(|x| Arc::unwrap_or_clone(x.clone()))
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Default partition spec {default_spec_id} not found"),
)
})?
.into();
let default_partition_type = default_spec.partition_type(¤t_schema)?;
let mut metadata = TableMetadata {
format_version: FormatVersion::V1,
table_uuid: value.table_uuid.unwrap_or_default(),
location: value.location,
last_sequence_number: 0,
last_updated_ms: value.last_updated_ms,
last_column_id: value.last_column_id,
current_schema_id,
default_spec,
default_partition_type,
last_partition_id: value
.last_partition_id
.unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()),
partition_specs,
schemas,
properties: value.properties.unwrap_or_default(),
current_snapshot_id,
snapshots: value
.snapshots
.map(|snapshots| {
Ok::<_, Error>(HashMap::from_iter(
snapshots
.into_iter()
.map(|x| Ok((x.snapshot_id, Arc::new(x.try_into()?))))
.collect::<Result<Vec<_>, Error>>()?,
))
})
.transpose()?
.unwrap_or_default(),
snapshot_log: value.snapshot_log.unwrap_or_default(),
metadata_log: value.metadata_log.unwrap_or_default(),
sort_orders: match value.sort_orders {
Some(sort_orders) => HashMap::from_iter(
sort_orders.into_iter().map(|x| (x.order_id, Arc::new(x))),
),
None => HashMap::new(),
},
default_sort_order_id: value
.default_sort_order_id
.unwrap_or(SortOrder::UNSORTED_ORDER_ID),
refs: if let Some(snapshot_id) = current_snapshot_id {
HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference {
snapshot_id,
retention: SnapshotRetention::Branch {
min_snapshots_to_keep: None,
max_snapshot_age_ms: None,
max_ref_age_ms: None,
},
})])
} else {
HashMap::new()
},
statistics: index_statistics(value.statistics),
partition_statistics: index_partition_statistics(value.partition_statistics),
};
metadata.borrow_mut().try_normalize()?;
Ok(metadata)
}