in crates/iceberg/src/spec/view_metadata_builder.rs [264:344]
fn add_version_internal(&mut self, view_version: ViewVersion) -> Result<i32> {
let version_id = self.reuse_or_create_new_view_version_id(&view_version);
let view_version = view_version.with_version_id(version_id);
if self.metadata.versions.contains_key(&version_id) {
// ToDo Discuss: Similar to TableMetadata sort-order, Java does not add changes
// in this case. I prefer to add changes as the state of the builder is
// potentially mutated (`last_added_version_id`), thus we should record the change.
if self.last_added_version_id != Some(version_id) {
self.changes
.push(ViewUpdate::AddViewVersion { view_version });
self.last_added_version_id = Some(version_id);
}
return Ok(version_id);
}
let view_version = if view_version.schema_id() == Self::LAST_ADDED {
let last_added_schema_id = self.last_added_schema_id.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
"Cannot set last added schema: no schema has been added",
)
})?;
view_version.with_schema_id(last_added_schema_id)
} else {
view_version
};
if !self
.metadata
.schemas
.contains_key(&view_version.schema_id())
{
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Cannot add version with unknown schema: {}",
view_version.schema_id()
),
));
}
require_unique_dialects(&view_version)?;
// The `TableMetadataBuilder` uses these checks in multiple places - also in Java.
// If we think delayed requests are a problem, I think we should also add it here.
if let Some(last) = self.metadata.version_log.last() {
// commits can happen concurrently from different machines.
// A tolerance helps us avoid failure for small clock skew
if view_version.timestamp_ms() - last.timestamp_ms() < -ONE_MINUTE_MS {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Invalid snapshot timestamp {}: before last snapshot timestamp {}",
view_version.timestamp_ms(),
last.timestamp_ms()
),
));
}
}
self.metadata
.versions
.insert(version_id, Arc::new(view_version.clone()));
let view_version = if let Some(last_added_schema_id) = self.last_added_schema_id {
if view_version.schema_id() == last_added_schema_id {
view_version.with_schema_id(Self::LAST_ADDED)
} else {
view_version
}
} else {
view_version
};
self.changes
.push(ViewUpdate::AddViewVersion { view_version });
self.last_added_version_id = Some(version_id);
Ok(version_id)
}