in pyiceberg/table/update/__init__.py [0:0]
def _(update: SetSnapshotRefUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
snapshot_ref = SnapshotRef(
snapshot_id=update.snapshot_id,
snapshot_ref_type=update.type,
min_snapshots_to_keep=update.min_snapshots_to_keep,
max_snapshot_age_ms=update.max_snapshot_age_ms,
max_ref_age_ms=update.max_ref_age_ms,
)
existing_ref = base_metadata.refs.get(update.ref_name)
if existing_ref is not None and existing_ref == snapshot_ref:
return base_metadata
snapshot = base_metadata.snapshot_by_id(snapshot_ref.snapshot_id)
if snapshot is None:
raise ValueError(f"Cannot set {update.ref_name} to unknown snapshot {snapshot_ref.snapshot_id}")
metadata_updates: Dict[str, Any] = {}
if context.is_added_snapshot(snapshot_ref.snapshot_id):
metadata_updates["last_updated_ms"] = snapshot.timestamp_ms
if update.ref_name == MAIN_BRANCH:
metadata_updates["current_snapshot_id"] = snapshot_ref.snapshot_id
if "last_updated_ms" not in metadata_updates:
metadata_updates["last_updated_ms"] = datetime_to_millis(datetime.now().astimezone())
metadata_updates["snapshot_log"] = base_metadata.snapshot_log + [
SnapshotLogEntry(
snapshot_id=snapshot_ref.snapshot_id,
timestamp_ms=metadata_updates["last_updated_ms"],
)
]
metadata_updates["refs"] = {**base_metadata.refs, update.ref_name: snapshot_ref}
context.add_update(update)
return base_metadata.model_copy(update=metadata_updates)