def _()

in pyiceberg/table/update/__init__.py [0:0]


def _(update: RemoveSnapshotsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
    for remove_snapshot_id in update.snapshot_ids:
        if not any(snapshot.snapshot_id == remove_snapshot_id for snapshot in base_metadata.snapshots):
            raise ValueError(f"Snapshot with snapshot id {remove_snapshot_id} does not exist: {base_metadata.snapshots}")

    snapshots = [
        (
            snapshot.model_copy(update={"parent_snapshot_id": None})
            if snapshot.parent_snapshot_id in update.snapshot_ids
            else snapshot
        )
        for snapshot in base_metadata.snapshots
        if snapshot.snapshot_id not in update.snapshot_ids
    ]
    snapshot_log = [
        snapshot_log_entry
        for snapshot_log_entry in base_metadata.snapshot_log
        if snapshot_log_entry.snapshot_id not in update.snapshot_ids
    ]

    remove_ref_updates = (
        RemoveSnapshotRefUpdate(ref_name=ref_name)
        for ref_name, ref in base_metadata.refs.items()
        if ref.snapshot_id in update.snapshot_ids
    )
    remove_statistics_updates = (
        RemoveStatisticsUpdate(statistics_file.snapshot_id)
        for statistics_file in base_metadata.statistics
        if statistics_file.snapshot_id in update.snapshot_ids
    )
    updates = itertools.chain(remove_ref_updates, remove_statistics_updates)
    new_metadata = base_metadata
    for upd in updates:
        new_metadata = _apply_table_update(upd, new_metadata, context)

    context.add_update(update)
    return new_metadata.model_copy(update={"snapshots": snapshots, "snapshot_log": snapshot_log})