pyiceberg/table/snapshots.py (343 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import time
import warnings
from collections import defaultdict
from enum import Enum
from typing import TYPE_CHECKING, Any, DefaultDict, Dict, Iterable, List, Mapping, Optional
from pydantic import Field, PrivateAttr, model_serializer
from pyiceberg.io import FileIO
from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, _manifests
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.utils.deprecated import deprecation_message
if TYPE_CHECKING:
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.typedef import IcebergBaseModel
ADDED_DATA_FILES = "added-data-files"
ADDED_DELETE_FILES = "added-delete-files"
ADDED_EQUALITY_DELETES = "added-equality-deletes"
ADDED_FILE_SIZE = "added-files-size"
ADDED_POSITION_DELETES = "added-position-deletes"
ADDED_POSITION_DELETE_FILES = "added-position-delete-files"
ADDED_RECORDS = "added-records"
DELETED_DATA_FILES = "deleted-data-files"
DELETED_RECORDS = "deleted-records"
ADDED_EQUALITY_DELETE_FILES = "added-equality-delete-files"
REMOVED_DELETE_FILES = "removed-delete-files"
REMOVED_EQUALITY_DELETES = "removed-equality-deletes"
REMOVED_EQUALITY_DELETE_FILES = "removed-equality-delete-files"
REMOVED_FILE_SIZE = "removed-files-size"
REMOVED_POSITION_DELETES = "removed-position-deletes"
REMOVED_POSITION_DELETE_FILES = "removed-position-delete-files"
TOTAL_EQUALITY_DELETES = "total-equality-deletes"
TOTAL_POSITION_DELETES = "total-position-deletes"
TOTAL_DATA_FILES = "total-data-files"
TOTAL_DELETE_FILES = "total-delete-files"
TOTAL_RECORDS = "total-records"
TOTAL_FILE_SIZE = "total-files-size"
CHANGED_PARTITION_COUNT_PROP = "changed-partition-count"
CHANGED_PARTITION_PREFIX = "partitions."
OPERATION = "operation"
INITIAL_SEQUENCE_NUMBER = 0
class Operation(Enum):
"""Describes the operation.
Possible operation values are:
- append: Only data files were added and no files were removed.
- replace: Data and delete files were added and removed without changing table data; i.e., compaction, changing the data file format, or relocating data files.
- overwrite: Data and delete files were added and removed in a logical overwrite operation.
- delete: Data files were removed and their contents logically deleted and/or delete files were added to delete rows.
"""
APPEND = "append"
REPLACE = "replace"
OVERWRITE = "overwrite"
DELETE = "delete"
def __repr__(self) -> str:
"""Return the string representation of the Operation class."""
return f"Operation.{self.name}"
class UpdateMetrics:
added_file_size: int
removed_file_size: int
added_data_files: int
removed_data_files: int
added_eq_delete_files: int
removed_eq_delete_files: int
added_pos_delete_files: int
removed_pos_delete_files: int
added_delete_files: int
removed_delete_files: int
added_records: int
deleted_records: int
added_pos_deletes: int
removed_pos_deletes: int
added_eq_deletes: int
removed_eq_deletes: int
def __init__(self) -> None:
self.added_file_size = 0
self.removed_file_size = 0
self.added_data_files = 0
self.removed_data_files = 0
self.added_eq_delete_files = 0
self.removed_eq_delete_files = 0
self.added_pos_delete_files = 0
self.removed_pos_delete_files = 0
self.added_delete_files = 0
self.removed_delete_files = 0
self.added_records = 0
self.deleted_records = 0
self.added_pos_deletes = 0
self.removed_pos_deletes = 0
self.added_eq_deletes = 0
self.removed_eq_deletes = 0
def add_file(self, data_file: DataFile) -> None:
self.added_file_size += data_file.file_size_in_bytes
if data_file.content == DataFileContent.DATA:
self.added_data_files += 1
self.added_records += data_file.record_count
elif data_file.content == DataFileContent.POSITION_DELETES:
self.added_delete_files += 1
self.added_pos_delete_files += 1
self.added_pos_deletes += data_file.record_count
elif data_file.content == DataFileContent.EQUALITY_DELETES:
self.added_delete_files += 1
self.added_eq_delete_files += 1
self.added_eq_deletes += data_file.record_count
else:
raise ValueError(f"Unknown data file content: {data_file.content}")
def remove_file(self, data_file: DataFile) -> None:
self.removed_file_size += data_file.file_size_in_bytes
if data_file.content == DataFileContent.DATA:
self.removed_data_files += 1
self.deleted_records += data_file.record_count
elif data_file.content == DataFileContent.POSITION_DELETES:
self.removed_delete_files += 1
self.removed_pos_delete_files += 1
self.removed_pos_deletes += data_file.record_count
elif data_file.content == DataFileContent.EQUALITY_DELETES:
self.removed_delete_files += 1
self.removed_eq_delete_files += 1
self.removed_eq_deletes += data_file.record_count
else:
raise ValueError(f"Unknown data file content: {data_file.content}")
def to_dict(self) -> Dict[str, str]:
properties: Dict[str, str] = {}
set_when_positive(properties, self.added_file_size, ADDED_FILE_SIZE)
set_when_positive(properties, self.removed_file_size, REMOVED_FILE_SIZE)
set_when_positive(properties, self.added_data_files, ADDED_DATA_FILES)
set_when_positive(properties, self.removed_data_files, DELETED_DATA_FILES)
set_when_positive(properties, self.added_eq_delete_files, ADDED_EQUALITY_DELETE_FILES)
set_when_positive(properties, self.removed_eq_delete_files, REMOVED_EQUALITY_DELETE_FILES)
set_when_positive(properties, self.added_pos_delete_files, ADDED_POSITION_DELETE_FILES)
set_when_positive(properties, self.removed_pos_delete_files, REMOVED_POSITION_DELETE_FILES)
set_when_positive(properties, self.added_delete_files, ADDED_DELETE_FILES)
set_when_positive(properties, self.removed_delete_files, REMOVED_DELETE_FILES)
set_when_positive(properties, self.added_records, ADDED_RECORDS)
set_when_positive(properties, self.deleted_records, DELETED_RECORDS)
set_when_positive(properties, self.added_pos_deletes, ADDED_POSITION_DELETES)
set_when_positive(properties, self.removed_pos_deletes, REMOVED_POSITION_DELETES)
set_when_positive(properties, self.added_eq_deletes, ADDED_EQUALITY_DELETES)
set_when_positive(properties, self.removed_eq_deletes, REMOVED_EQUALITY_DELETES)
return properties
class Summary(IcebergBaseModel, Mapping[str, str]):
"""A class that stores the summary information for a Snapshot.
The snapshot summary’s operation field is used by some operations,
like snapshot expiration, to skip processing certain snapshots.
"""
operation: Operation = Field()
_additional_properties: Dict[str, str] = PrivateAttr()
def __init__(self, operation: Optional[Operation] = None, **data: Any) -> None:
if operation is None:
warnings.warn("Encountered invalid snapshot summary: operation is missing, defaulting to overwrite")
operation = Operation.OVERWRITE
super().__init__(operation=operation, **data)
self._additional_properties = data
def __getitem__(self, __key: str) -> Optional[Any]: # type: ignore
"""Return a key as it is a map."""
if __key.lower() == "operation":
return self.operation
else:
return self._additional_properties.get(__key)
def __setitem__(self, key: str, value: Any) -> None:
"""Set a key as it is a map."""
if key.lower() == "operation":
self.operation = value
else:
self._additional_properties[key] = value
def __len__(self) -> int:
"""Return the number of keys in the summary."""
# Operation is required
return 1 + len(self._additional_properties)
@model_serializer
def ser_model(self) -> Dict[str, str]:
return {
"operation": str(self.operation.value),
**self._additional_properties,
}
@property
def additional_properties(self) -> Dict[str, str]:
return self._additional_properties
def __repr__(self) -> str:
"""Return the string representation of the Summary class."""
repr_properties = f", **{repr(self._additional_properties)}" if self._additional_properties else ""
return f"Summary({repr(self.operation)}{repr_properties})"
def __eq__(self, other: Any) -> bool:
"""Compare if the summary is equal to another summary."""
return (
self.operation == other.operation and self.additional_properties == other.additional_properties
if isinstance(other, Summary)
else False
)
class Snapshot(IcebergBaseModel):
snapshot_id: int = Field(alias="snapshot-id")
parent_snapshot_id: Optional[int] = Field(alias="parent-snapshot-id", default=None)
sequence_number: Optional[int] = Field(alias="sequence-number", default=INITIAL_SEQUENCE_NUMBER)
timestamp_ms: int = Field(alias="timestamp-ms", default_factory=lambda: int(time.time() * 1000))
manifest_list: str = Field(alias="manifest-list", description="Location of the snapshot's manifest list file")
summary: Optional[Summary] = Field(default=None)
schema_id: Optional[int] = Field(alias="schema-id", default=None)
def __str__(self) -> str:
"""Return the string representation of the Snapshot class."""
operation = f"{self.summary.operation}: " if self.summary else ""
parent_id = f", parent_id={self.parent_snapshot_id}" if self.parent_snapshot_id else ""
schema_id = f", schema_id={self.schema_id}" if self.schema_id is not None else ""
result_str = f"{operation}id={self.snapshot_id}{parent_id}{schema_id}"
return result_str
def manifests(self, io: FileIO) -> List[ManifestFile]:
"""Return the manifests for the given snapshot."""
return list(_manifests(io, self.manifest_list))
class MetadataLogEntry(IcebergBaseModel):
metadata_file: str = Field(alias="metadata-file")
timestamp_ms: int = Field(alias="timestamp-ms")
class SnapshotLogEntry(IcebergBaseModel):
snapshot_id: int = Field(alias="snapshot-id")
timestamp_ms: int = Field(alias="timestamp-ms")
class SnapshotSummaryCollector:
metrics: UpdateMetrics
partition_metrics: DefaultDict[str, UpdateMetrics]
max_changed_partitions_for_summaries: int
def __init__(self, partition_summary_limit: int = 0) -> None:
self.metrics = UpdateMetrics()
self.partition_metrics = defaultdict(UpdateMetrics)
self.max_changed_partitions_for_summaries = partition_summary_limit
def set_partition_summary_limit(self, limit: int) -> None:
self.max_changed_partitions_for_summaries = limit
def add_file(self, data_file: DataFile, schema: Schema, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC) -> None:
self.metrics.add_file(data_file)
if len(data_file.partition) > 0:
self.update_partition_metrics(partition_spec=partition_spec, file=data_file, is_add_file=True, schema=schema)
def remove_file(
self, data_file: DataFile, schema: Schema, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC
) -> None:
self.metrics.remove_file(data_file)
if len(data_file.partition) > 0:
self.update_partition_metrics(partition_spec=partition_spec, file=data_file, is_add_file=False, schema=schema)
def update_partition_metrics(self, partition_spec: PartitionSpec, file: DataFile, is_add_file: bool, schema: Schema) -> None:
partition_path = partition_spec.partition_to_path(file.partition, schema)
partition_metrics: UpdateMetrics = self.partition_metrics[partition_path]
if is_add_file:
partition_metrics.add_file(file)
else:
partition_metrics.remove_file(file)
def build(self) -> Dict[str, str]:
properties = self.metrics.to_dict()
changed_partitions_size = len(self.partition_metrics)
set_when_positive(properties, changed_partitions_size, CHANGED_PARTITION_COUNT_PROP)
if changed_partitions_size <= self.max_changed_partitions_for_summaries:
for partition_path, update_metrics_partition in self.partition_metrics.items():
if (summary := self._partition_summary(update_metrics_partition)) and len(summary) != 0:
properties[CHANGED_PARTITION_PREFIX + partition_path] = summary
return properties
def _partition_summary(self, update_metrics: UpdateMetrics) -> str:
return ",".join([f"{prop}={val}" for prop, val in update_metrics.to_dict().items()])
def _truncate_table_summary(summary: Summary, previous_summary: Mapping[str, str]) -> Summary:
for prop in {
TOTAL_DATA_FILES,
TOTAL_DELETE_FILES,
TOTAL_RECORDS,
TOTAL_FILE_SIZE,
TOTAL_POSITION_DELETES,
TOTAL_EQUALITY_DELETES,
}:
summary[prop] = "0"
def get_prop(prop: str) -> int:
value = previous_summary.get(prop) or "0"
try:
return int(value)
except ValueError as e:
raise ValueError(f"Could not parse summary property {prop} to an int: {value}") from e
if value := get_prop(TOTAL_DATA_FILES):
summary[DELETED_DATA_FILES] = str(value)
if value := get_prop(TOTAL_DELETE_FILES):
summary[REMOVED_DELETE_FILES] = str(value)
if value := get_prop(TOTAL_RECORDS):
summary[DELETED_RECORDS] = str(value)
if value := get_prop(TOTAL_FILE_SIZE):
summary[REMOVED_FILE_SIZE] = str(value)
if value := get_prop(TOTAL_POSITION_DELETES):
summary[REMOVED_POSITION_DELETES] = str(value)
if value := get_prop(TOTAL_EQUALITY_DELETES):
summary[REMOVED_EQUALITY_DELETES] = str(value)
return summary
def update_snapshot_summaries(
summary: Summary, previous_summary: Optional[Mapping[str, str]] = None, truncate_full_table: bool = False
) -> Summary:
if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE}:
raise ValueError(f"Operation not implemented: {summary.operation}")
if truncate_full_table and summary.operation == Operation.OVERWRITE and previous_summary is not None:
deprecation_message(
deprecated_in="0.10.0",
removed_in="0.11.0",
help_message="The truncate-full-table shouldn't be used.",
)
summary = _truncate_table_summary(summary, previous_summary)
if not previous_summary:
previous_summary = {
TOTAL_DATA_FILES: "0",
TOTAL_DELETE_FILES: "0",
TOTAL_RECORDS: "0",
TOTAL_FILE_SIZE: "0",
TOTAL_POSITION_DELETES: "0",
TOTAL_EQUALITY_DELETES: "0",
}
def _update_totals(total_property: str, added_property: str, removed_property: str) -> None:
if previous_total_str := previous_summary.get(total_property):
try:
new_total = int(previous_total_str)
if new_total >= 0 and (added := summary.get(added_property)):
new_total += int(added)
if new_total >= 0 and (removed := summary.get(removed_property)):
new_total -= int(removed)
except ValueError as e:
raise ValueError(f"Could not parse summary property {total_property} to an int: {previous_total_str}") from e
if new_total >= 0:
summary[total_property] = str(new_total)
_update_totals(
total_property=TOTAL_DATA_FILES,
added_property=ADDED_DATA_FILES,
removed_property=DELETED_DATA_FILES,
)
_update_totals(
total_property=TOTAL_DELETE_FILES,
added_property=ADDED_DELETE_FILES,
removed_property=REMOVED_DELETE_FILES,
)
_update_totals(
total_property=TOTAL_RECORDS,
added_property=ADDED_RECORDS,
removed_property=DELETED_RECORDS,
)
_update_totals(
total_property=TOTAL_FILE_SIZE,
added_property=ADDED_FILE_SIZE,
removed_property=REMOVED_FILE_SIZE,
)
_update_totals(
total_property=TOTAL_POSITION_DELETES,
added_property=ADDED_POSITION_DELETES,
removed_property=REMOVED_POSITION_DELETES,
)
_update_totals(
total_property=TOTAL_EQUALITY_DELETES,
added_property=ADDED_EQUALITY_DELETES,
removed_property=REMOVED_EQUALITY_DELETES,
)
return summary
def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None:
if num > 0:
properties[property_name] = str(num)
def ancestors_of(current_snapshot: Optional[Snapshot], table_metadata: TableMetadata) -> Iterable[Snapshot]:
"""Get the ancestors of and including the given snapshot."""
snapshot = current_snapshot
while snapshot is not None:
yield snapshot
if snapshot.parent_snapshot_id is None:
break
snapshot = table_metadata.snapshot_by_id(snapshot.parent_snapshot_id)
def ancestors_between(
from_snapshot: Optional[Snapshot], to_snapshot: Snapshot, table_metadata: TableMetadata
) -> Iterable[Snapshot]:
"""Get the ancestors of and including the given snapshot between the to and from snapshots."""
if from_snapshot is not None:
for snapshot in ancestors_of(to_snapshot, table_metadata):
yield snapshot
if snapshot == from_snapshot:
break
else:
yield from ancestors_of(to_snapshot, table_metadata)