in pyiceberg/table/update/__init__.py [0:0]
def _(update: RemoveSnapshotsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
for remove_snapshot_id in update.snapshot_ids:
if not any(snapshot.snapshot_id == remove_snapshot_id for snapshot in base_metadata.snapshots):
raise ValueError(f"Snapshot with snapshot id {remove_snapshot_id} does not exist: {base_metadata.snapshots}")
snapshots = [
(
snapshot.model_copy(update={"parent_snapshot_id": None})
if snapshot.parent_snapshot_id in update.snapshot_ids
else snapshot
)
for snapshot in base_metadata.snapshots
if snapshot.snapshot_id not in update.snapshot_ids
]
snapshot_log = [
snapshot_log_entry
for snapshot_log_entry in base_metadata.snapshot_log
if snapshot_log_entry.snapshot_id not in update.snapshot_ids
]
remove_ref_updates = (
RemoveSnapshotRefUpdate(ref_name=ref_name)
for ref_name, ref in base_metadata.refs.items()
if ref.snapshot_id in update.snapshot_ids
)
remove_statistics_updates = (
RemoveStatisticsUpdate(statistics_file.snapshot_id)
for statistics_file in base_metadata.statistics
if statistics_file.snapshot_id in update.snapshot_ids
)
updates = itertools.chain(remove_ref_updates, remove_statistics_updates)
new_metadata = base_metadata
for upd in updates:
new_metadata = _apply_table_update(upd, new_metadata, context)
context.add_update(update)
return new_metadata.model_copy(update={"snapshots": snapshots, "snapshot_log": snapshot_log})