in pyiceberg/table/update/__init__.py [0:0]
def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
if len(base_metadata.schemas) == 0:
raise ValueError("Attempting to add a snapshot before a schema is added")
elif len(base_metadata.partition_specs) == 0:
raise ValueError("Attempting to add a snapshot before a partition spec is added")
elif len(base_metadata.sort_orders) == 0:
raise ValueError("Attempting to add a snapshot before a sort order is added")
elif base_metadata.snapshot_by_id(update.snapshot.snapshot_id) is not None:
raise ValueError(f"Snapshot with id {update.snapshot.snapshot_id} already exists")
elif (
base_metadata.format_version == 2
and update.snapshot.sequence_number is not None
and update.snapshot.sequence_number <= base_metadata.last_sequence_number
and update.snapshot.parent_snapshot_id is not None
):
raise ValueError(
f"Cannot add snapshot with sequence number {update.snapshot.sequence_number} "
f"older than last sequence number {base_metadata.last_sequence_number}"
)
context.add_update(update)
return base_metadata.model_copy(
update={
"last_updated_ms": update.snapshot.timestamp_ms,
"last_sequence_number": update.snapshot.sequence_number,
"snapshots": base_metadata.snapshots + [update.snapshot],
}
)