pyiceberg/manifest.py (887 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from __future__ import annotations import math from abc import ABC, abstractmethod from copy import copy from enum import Enum from types import TracebackType from typing import ( Any, Dict, Iterator, List, Literal, Optional, Tuple, Type, Union, ) from cachetools import LRUCache, cached from cachetools.keys import hashkey from pydantic_core import to_json from pyiceberg.avro.file import AvroFile, AvroOutputFile from pyiceberg.conversions import to_bytes from pyiceberg.exceptions import ValidationError from pyiceberg.io import FileIO, InputFile, OutputFile from pyiceberg.partitioning import PartitionSpec from pyiceberg.schema import Schema from pyiceberg.typedef import Record, TableVersion from pyiceberg.types import ( BinaryType, BooleanType, IntegerType, ListType, LongType, MapType, NestedField, PrimitiveType, StringType, StructType, ) UNASSIGNED_SEQ = -1 DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024 DEFAULT_READ_VERSION: Literal[2] = 2 INITIAL_SEQUENCE_NUMBER = 0 class DataFileContent(int, Enum): DATA = 0 POSITION_DELETES = 1 EQUALITY_DELETES = 2 def __repr__(self) -> str: """Return the string representation of the DataFileContent class.""" return f"DataFileContent.{self.name}" class ManifestContent(int, Enum): DATA = 0 DELETES = 1 def __repr__(self) -> str: """Return the string representation of the ManifestContent class.""" return f"ManifestContent.{self.name}" class ManifestEntryStatus(int, Enum): EXISTING = 0 ADDED = 1 DELETED = 2 def __repr__(self) -> str: """Return the string representation of the ManifestEntryStatus class.""" return f"ManifestEntryStatus.{self.name}" class FileFormat(str, Enum): AVRO = "AVRO" PARQUET = "PARQUET" ORC = "ORC" PUFFIN = "PUFFIN" @classmethod def _missing_(cls, value: object) -> Union[None, str]: for member in cls: if member.value == str(value).upper(): return member return None def __repr__(self) -> str: """Return the string representation of the FileFormat class.""" return f"FileFormat.{self.name}" DATA_FILE_TYPE: Dict[int, StructType] = { 1: StructType( NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"), NestedField( field_id=101, name="file_format", field_type=StringType(), required=True, doc="File format name: avro, orc, or parquet", ), NestedField( field_id=102, name="partition", field_type=StructType(), required=True, doc="Partition data tuple, schema based on the partition spec", ), NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"), NestedField( field_id=104, name="file_size_in_bytes", field_type=LongType(), required=True, doc="Total file size in bytes" ), NestedField( field_id=105, name="block_size_in_bytes", field_type=LongType(), required=True, doc="Deprecated. Always write a default in v1. Do not write in v2.", write_default=DEFAULT_BLOCK_SIZE, ), NestedField( field_id=108, name="column_sizes", field_type=MapType(key_id=117, key_type=IntegerType(), value_id=118, value_type=LongType()), required=False, doc="Map of column id to total size on disk", ), NestedField( field_id=109, name="value_counts", field_type=MapType(key_id=119, key_type=IntegerType(), value_id=120, value_type=LongType()), required=False, doc="Map of column id to total count, including null and NaN", ), NestedField( field_id=110, name="null_value_counts", field_type=MapType(key_id=121, key_type=IntegerType(), value_id=122, value_type=LongType()), required=False, doc="Map of column id to null value count", ), NestedField( field_id=137, name="nan_value_counts", field_type=MapType(key_id=138, key_type=IntegerType(), value_id=139, value_type=LongType()), required=False, doc="Map of column id to number of NaN values in the column", ), NestedField( field_id=125, name="lower_bounds", field_type=MapType(key_id=126, key_type=IntegerType(), value_id=127, value_type=BinaryType()), required=False, doc="Map of column id to lower bound", ), NestedField( field_id=128, name="upper_bounds", field_type=MapType(key_id=129, key_type=IntegerType(), value_id=130, value_type=BinaryType()), required=False, doc="Map of column id to upper bound", ), NestedField( field_id=131, name="key_metadata", field_type=BinaryType(), required=False, doc="Encryption key metadata blob" ), NestedField( field_id=132, name="split_offsets", field_type=ListType(element_id=133, element_type=LongType(), element_required=True), required=False, doc="Splittable offsets", ), NestedField(field_id=140, name="sort_order_id", field_type=IntegerType(), required=False, doc="Sort order ID"), ), 2: StructType( NestedField( field_id=134, name="content", field_type=IntegerType(), required=True, doc="File format name: avro, orc, or parquet", initial_default=DataFileContent.DATA, ), NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"), NestedField( field_id=101, name="file_format", field_type=StringType(), required=True, doc="File format name: avro, orc, or parquet", ), NestedField( field_id=102, name="partition", field_type=StructType(), required=True, doc="Partition data tuple, schema based on the partition spec", ), NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"), NestedField( field_id=104, name="file_size_in_bytes", field_type=LongType(), required=True, doc="Total file size in bytes" ), NestedField( field_id=108, name="column_sizes", field_type=MapType(key_id=117, key_type=IntegerType(), value_id=118, value_type=LongType()), required=False, doc="Map of column id to total size on disk", ), NestedField( field_id=109, name="value_counts", field_type=MapType(key_id=119, key_type=IntegerType(), value_id=120, value_type=LongType()), required=False, doc="Map of column id to total count, including null and NaN", ), NestedField( field_id=110, name="null_value_counts", field_type=MapType(key_id=121, key_type=IntegerType(), value_id=122, value_type=LongType()), required=False, doc="Map of column id to null value count", ), NestedField( field_id=137, name="nan_value_counts", field_type=MapType(key_id=138, key_type=IntegerType(), value_id=139, value_type=LongType()), required=False, doc="Map of column id to number of NaN values in the column", ), NestedField( field_id=125, name="lower_bounds", field_type=MapType(key_id=126, key_type=IntegerType(), value_id=127, value_type=BinaryType()), required=False, doc="Map of column id to lower bound", ), NestedField( field_id=128, name="upper_bounds", field_type=MapType(key_id=129, key_type=IntegerType(), value_id=130, value_type=BinaryType()), required=False, doc="Map of column id to upper bound", ), NestedField( field_id=131, name="key_metadata", field_type=BinaryType(), required=False, doc="Encryption key metadata blob" ), NestedField( field_id=132, name="split_offsets", field_type=ListType(element_id=133, element_type=LongType(), element_required=True), required=False, doc="Splittable offsets", ), NestedField( field_id=135, name="equality_ids", field_type=ListType(element_id=136, element_type=LongType(), element_required=True), required=False, doc="Field ids used to determine row equality in equality delete files.", ), NestedField( field_id=140, name="sort_order_id", field_type=IntegerType(), required=False, doc="ID representing sort order for this file", ), ), } def data_file_with_partition(partition_type: StructType, format_version: TableVersion) -> StructType: data_file_partition_type = StructType( *[ NestedField( field_id=field.field_id, name=field.name, field_type=field.field_type, required=field.required, ) for field in partition_type.fields ] ) return StructType( *[ NestedField( field_id=102, name="partition", field_type=data_file_partition_type, required=True, doc="Partition data tuple, schema based on the partition spec", ) if field.field_id == 102 else field for field in DATA_FILE_TYPE[format_version].fields ] ) class DataFile(Record): @classmethod def from_args(cls, _table_format_version: TableVersion = DEFAULT_READ_VERSION, **arguments: Any) -> DataFile: struct = DATA_FILE_TYPE[_table_format_version] return super()._bind(struct, **arguments) @property def content(self) -> DataFileContent: return self._data[0] @property def file_path(self) -> str: return self._data[1] @property def file_format(self) -> FileFormat: return self._data[2] @property def partition(self) -> Record: return self._data[3] @property def record_count(self) -> int: return self._data[4] @property def file_size_in_bytes(self) -> int: return self._data[5] @property def column_sizes(self) -> Dict[int, int]: return self._data[6] @property def value_counts(self) -> Dict[int, int]: return self._data[7] @property def null_value_counts(self) -> Dict[int, int]: return self._data[8] @property def nan_value_counts(self) -> Dict[int, int]: return self._data[9] @property def lower_bounds(self) -> Dict[int, bytes]: return self._data[10] @property def upper_bounds(self) -> Dict[int, bytes]: return self._data[11] @property def key_metadata(self) -> Optional[bytes]: return self._data[12] @property def split_offsets(self) -> Optional[List[int]]: return self._data[13] @property def equality_ids(self) -> Optional[List[int]]: return self._data[14] @property def sort_order_id(self) -> Optional[int]: return self._data[15] # Spec ID should not be stored in the file _spec_id: int @property def spec_id(self) -> int: return self._spec_id @spec_id.setter def spec_id(self, value: int) -> None: self._spec_id = value def __setattr__(self, name: str, value: Any) -> None: """Assign a key/value to a DataFile.""" # The file_format is written as a string, so we need to cast it to the Enum if name == "file_format": value = FileFormat[value] super().__setattr__(name, value) def __hash__(self) -> int: """Return the hash of the file path.""" return hash(self.file_path) def __eq__(self, other: Any) -> bool: """Compare the datafile with another object. If it is a datafile, it will compare based on the file_path. """ return self.file_path == other.file_path if isinstance(other, DataFile) else False MANIFEST_ENTRY_SCHEMAS = { 1: Schema( NestedField(0, "status", IntegerType(), required=True), NestedField(1, "snapshot_id", LongType(), required=True), NestedField(2, "data_file", DATA_FILE_TYPE[1], required=True), ), 2: Schema( NestedField(0, "status", IntegerType(), required=True), NestedField(1, "snapshot_id", LongType(), required=False), NestedField(3, "sequence_number", LongType(), required=False), NestedField(4, "file_sequence_number", LongType(), required=False), NestedField(2, "data_file", DATA_FILE_TYPE[2], required=True), ), } MANIFEST_ENTRY_SCHEMAS_STRUCT = {format_version: schema.as_struct() for format_version, schema in MANIFEST_ENTRY_SCHEMAS.items()} def manifest_entry_schema_with_data_file(format_version: TableVersion, data_file: StructType) -> Schema: return Schema( *[ NestedField(2, "data_file", data_file, required=True) if field.field_id == 2 else field for field in MANIFEST_ENTRY_SCHEMAS[format_version].fields ] ) class ManifestEntry(Record): @classmethod def from_args(cls, _table_format_version: TableVersion = DEFAULT_READ_VERSION, **arguments: Any) -> ManifestEntry: return super()._bind(**arguments, struct=MANIFEST_ENTRY_SCHEMAS_STRUCT[_table_format_version]) @property def status(self) -> ManifestEntryStatus: return self._data[0] @status.setter def status(self, value: ManifestEntryStatus) -> None: self._data[0] = value @property def snapshot_id(self) -> Optional[int]: return self._data[1] @snapshot_id.setter def snapshot_id(self, value: int) -> None: self._data[0] = value @property def sequence_number(self) -> Optional[int]: return self._data[2] @sequence_number.setter def sequence_number(self, value: int) -> None: self._data[2] = value @property def file_sequence_number(self) -> Optional[int]: return self._data[3] @file_sequence_number.setter def file_sequence_number(self, value: int) -> None: self._data[3] = value @property def data_file(self) -> DataFile: return self._data[4] @data_file.setter def data_file(self, value: DataFile) -> None: self._data[4] = value PARTITION_FIELD_SUMMARY_TYPE = StructType( NestedField(509, "contains_null", BooleanType(), required=True), NestedField(518, "contains_nan", BooleanType(), required=False), NestedField(510, "lower_bound", BinaryType(), required=False), NestedField(511, "upper_bound", BinaryType(), required=False), ) class PartitionFieldSummary(Record): @classmethod def from_args(cls, **arguments: Any) -> PartitionFieldSummary: return super()._bind(**arguments, struct=PARTITION_FIELD_SUMMARY_TYPE) @property def contains_null(self) -> bool: return self._data[0] @property def contains_nan(self) -> Optional[bool]: return self._data[1] @property def lower_bound(self) -> Optional[bytes]: return self._data[2] @property def upper_bound(self) -> Optional[bytes]: return self._data[3] class PartitionFieldStats: _type: PrimitiveType _contains_null: bool _contains_nan: bool _min: Optional[Any] _max: Optional[Any] def __init__(self, iceberg_type: PrimitiveType) -> None: self._type = iceberg_type self._contains_null = False self._contains_nan = False self._min = None self._max = None def to_summary(self) -> PartitionFieldSummary: return PartitionFieldSummary( self._contains_null, self._contains_nan, to_bytes(self._type, self._min) if self._min is not None else None, to_bytes(self._type, self._max) if self._max is not None else None, ) def update(self, value: Any) -> None: if value is None: self._contains_null = True elif isinstance(value, float) and math.isnan(value): self._contains_nan = True else: if self._min is None: self._min = value self._max = value else: self._max = max(self._max, value) self._min = min(self._min, value) def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partitions: List[Record]) -> List[PartitionFieldSummary]: types = [field.field_type for field in spec.partition_type(schema).fields] field_stats = [PartitionFieldStats(field_type) for field_type in types] for partition_keys in partitions: for i, field_type in enumerate(types): if not isinstance(field_type, PrimitiveType): raise ValueError(f"Expected a primitive type for the partition field, got {field_type}") partition_key = partition_keys[i] field_stats[i].update(partition_key) return [field.to_summary() for field in field_stats] MANIFEST_LIST_FILE_SCHEMAS: Dict[int, Schema] = { 1: Schema( NestedField(500, "manifest_path", StringType(), required=True, doc="Location URI with FS scheme"), NestedField(501, "manifest_length", LongType(), required=True), NestedField(502, "partition_spec_id", IntegerType(), required=True), NestedField(503, "added_snapshot_id", LongType(), required=True), NestedField(504, "added_files_count", IntegerType(), required=False), NestedField(505, "existing_files_count", IntegerType(), required=False), NestedField(506, "deleted_files_count", IntegerType(), required=False), NestedField(512, "added_rows_count", LongType(), required=False), NestedField(513, "existing_rows_count", LongType(), required=False), NestedField(514, "deleted_rows_count", LongType(), required=False), NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False), NestedField(519, "key_metadata", BinaryType(), required=False), ), 2: Schema( NestedField(500, "manifest_path", StringType(), required=True, doc="Location URI with FS scheme"), NestedField(501, "manifest_length", LongType(), required=True), NestedField(502, "partition_spec_id", IntegerType(), required=True), NestedField(517, "content", IntegerType(), required=True, initial_default=ManifestContent.DATA), NestedField(515, "sequence_number", LongType(), required=True, initial_default=0), NestedField(516, "min_sequence_number", LongType(), required=True, initial_default=0), NestedField(503, "added_snapshot_id", LongType(), required=True), NestedField(504, "added_files_count", IntegerType(), required=True), NestedField(505, "existing_files_count", IntegerType(), required=True), NestedField(506, "deleted_files_count", IntegerType(), required=True), NestedField(512, "added_rows_count", LongType(), required=True), NestedField(513, "existing_rows_count", LongType(), required=True), NestedField(514, "deleted_rows_count", LongType(), required=True), NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False), NestedField(519, "key_metadata", BinaryType(), required=False), ), } MANIFEST_LIST_FILE_STRUCTS = {format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()} POSITIONAL_DELETE_SCHEMA = Schema( NestedField(2147483546, "file_path", StringType()), NestedField(2147483545, "pos", IntegerType()) ) class ManifestFile(Record): @classmethod def from_args(cls, _table_format_version: TableVersion = DEFAULT_READ_VERSION, **arguments: Any) -> ManifestFile: return super()._bind(**arguments, struct=MANIFEST_LIST_FILE_SCHEMAS[_table_format_version]) @property def manifest_path(self) -> str: return self._data[0] @property def manifest_length(self) -> int: return self._data[1] @property def partition_spec_id(self) -> int: return self._data[2] @property def content(self) -> ManifestContent: return self._data[3] @property def sequence_number(self) -> int: return self._data[4] @sequence_number.setter def sequence_number(self, value: int) -> None: self._data[4] = value @property def min_sequence_number(self) -> int: return self._data[5] @min_sequence_number.setter def min_sequence_number(self, value: int) -> None: self._data[5] = value @property def added_snapshot_id(self) -> Optional[int]: return self._data[6] @property def added_files_count(self) -> Optional[int]: return self._data[7] @property def existing_files_count(self) -> Optional[int]: return self._data[8] @property def deleted_files_count(self) -> Optional[int]: return self._data[9] @property def added_rows_count(self) -> Optional[int]: return self._data[10] @property def existing_rows_count(self) -> Optional[int]: return self._data[11] @property def deleted_rows_count(self) -> Optional[int]: return self._data[12] @property def partitions(self) -> Optional[List[PartitionFieldSummary]]: return self._data[13] @property def key_metadata(self) -> Optional[bytes]: return self._data[14] def has_added_files(self) -> bool: return self.added_files_count is None or self.added_files_count > 0 def has_existing_files(self) -> bool: return self.existing_files_count is None or self.existing_files_count > 0 def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List[ManifestEntry]: """ Read the manifest entries from the manifest file. Args: io: The FileIO to fetch the file. discard_deleted: Filter on live entries. Returns: An Iterator of manifest entries. """ input_file = io.new_input(self.manifest_path) with AvroFile[ManifestEntry]( input_file, MANIFEST_ENTRY_SCHEMAS[DEFAULT_READ_VERSION], read_types={-1: ManifestEntry, 2: DataFile}, read_enums={0: ManifestEntryStatus, 101: FileFormat, 134: DataFileContent}, ) as reader: return [ _inherit_from_manifest(entry, self) for entry in reader if not discard_deleted or entry.status != ManifestEntryStatus.DELETED ] @cached(cache=LRUCache(maxsize=128), key=lambda io, manifest_list: hashkey(manifest_list)) def _manifests(io: FileIO, manifest_list: str) -> Tuple[ManifestFile, ...]: """Read and cache manifests from the given manifest list, returning a tuple to prevent modification.""" file = io.new_input(manifest_list) return tuple(read_manifest_list(file)) def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]: """ Read the manifests from the manifest list. Args: input_file: The input file where the stream can be read from. Returns: An iterator of ManifestFiles that are part of the list. """ with AvroFile[ManifestFile]( input_file, MANIFEST_LIST_FILE_SCHEMAS[DEFAULT_READ_VERSION], read_types={-1: ManifestFile, 508: PartitionFieldSummary}, read_enums={517: ManifestContent}, ) as reader: yield from reader def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> ManifestEntry: """ Inherits properties from manifest file. The properties that will be inherited are: - sequence numbers - partition spec id. More information about inheriting sequence numbers: https://iceberg.apache.org/spec/#sequence-number-inheritance Args: entry: The manifest entry. manifest: The manifest file. Returns: The manifest entry with properties inherited. """ # Inherit sequence numbers. # The snapshot_id is required in V1, inherit with V2 when null if entry.snapshot_id is None: entry.snapshot_id = manifest.added_snapshot_id # in v1 tables, the sequence number is not persisted and can be safely defaulted to 0 # in v2 tables, the sequence number should be inherited iff the entry status is ADDED if entry.sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED): entry.sequence_number = manifest.sequence_number # in v1 tables, the file sequence number is not persisted and can be safely defaulted to 0 # in v2 tables, the file sequence number should be inherited iff the entry status is ADDED if entry.file_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED): # Only available in V2, always 0 in V1 entry.file_sequence_number = manifest.sequence_number # Inherit partition spec id. entry.data_file.spec_id = manifest.partition_spec_id return entry class ManifestWriter(ABC): closed: bool _spec: PartitionSpec _schema: Schema _output_file: OutputFile _writer: AvroOutputFile[ManifestEntry] _snapshot_id: int _added_files: int _added_rows: int _existing_files: int _existing_rows: int _deleted_files: int _deleted_rows: int _min_sequence_number: Optional[int] _partitions: List[Record] _reused_entry_wrapper: ManifestEntry def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int) -> None: self.closed = False self._spec = spec self._schema = schema self._output_file = output_file self._snapshot_id = snapshot_id self._added_files = 0 self._added_rows = 0 self._existing_files = 0 self._existing_rows = 0 self._deleted_files = 0 self._deleted_rows = 0 self._min_sequence_number = None self._partitions = [] def __enter__(self) -> ManifestWriter: """Open the writer.""" self._writer = self.new_writer() self._writer.__enter__() return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException], traceback: Optional[TracebackType], ) -> None: """Close the writer.""" if (self._added_files + self._existing_files + self._deleted_files) == 0: # This is just a guard to ensure that we don't write empty manifest files raise ValueError("An empty manifest file has been written") self.closed = True self._writer.__exit__(exc_type, exc_value, traceback) @abstractmethod def content(self) -> ManifestContent: ... @property @abstractmethod def version(self) -> TableVersion: ... @property def _meta(self) -> Dict[str, str]: return { "schema": self._schema.model_dump_json(), "partition-spec": to_json(self._spec.fields).decode("utf-8"), "partition-spec-id": str(self._spec.spec_id), "format-version": str(self.version), } def _with_partition(self, format_version: TableVersion) -> Schema: data_file_type = data_file_with_partition( format_version=format_version, partition_type=self._spec.partition_type(self._schema) ) return manifest_entry_schema_with_data_file(format_version=format_version, data_file=data_file_type) def new_writer(self) -> AvroOutputFile[ManifestEntry]: return AvroOutputFile[ManifestEntry]( output_file=self._output_file, file_schema=self._with_partition(self.version), record_schema=self._with_partition(DEFAULT_READ_VERSION), schema_name="manifest_entry", metadata=self._meta, ) @abstractmethod def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry: ... def to_manifest_file(self) -> ManifestFile: """Return the manifest file.""" # once the manifest file is generated, no more entries can be added self.closed = True min_sequence_number = self._min_sequence_number or UNASSIGNED_SEQ return ManifestFile.from_args( manifest_path=self._output_file.location, manifest_length=len(self._writer.output_file), partition_spec_id=self._spec.spec_id, content=self.content(), sequence_number=UNASSIGNED_SEQ, min_sequence_number=min_sequence_number, added_snapshot_id=self._snapshot_id, added_files_count=self._added_files, existing_files_count=self._existing_files, deleted_files_count=self._deleted_files, added_rows_count=self._added_rows, existing_rows_count=self._existing_rows, deleted_rows_count=self._deleted_rows, partitions=construct_partition_summaries(self._spec, self._schema, self._partitions), key_metadata=None, ) def add_entry(self, entry: ManifestEntry) -> ManifestWriter: if self.closed: raise RuntimeError("Cannot add entry to closed manifest writer") if entry.status == ManifestEntryStatus.ADDED: self._added_files += 1 self._added_rows += entry.data_file.record_count elif entry.status == ManifestEntryStatus.EXISTING: self._existing_files += 1 self._existing_rows += entry.data_file.record_count elif entry.status == ManifestEntryStatus.DELETED: self._deleted_files += 1 self._deleted_rows += entry.data_file.record_count else: raise ValueError(f"Unknown entry: {entry.status}") self._partitions.append(entry.data_file.partition) if ( (entry.status == ManifestEntryStatus.ADDED or entry.status == ManifestEntryStatus.EXISTING) and entry.sequence_number is not None and (self._min_sequence_number is None or entry.sequence_number < self._min_sequence_number) ): self._min_sequence_number = entry.sequence_number self._writer.write_block([self.prepare_entry(entry)]) return self def add(self, entry: ManifestEntry) -> ManifestWriter: if entry.sequence_number is not None and entry.sequence_number >= 0: self.add_entry( ManifestEntry.from_args( snapshot_id=self._snapshot_id, sequence_number=entry.sequence_number, data_file=entry.data_file ) ) else: self.add_entry( ManifestEntry.from_args( status=ManifestEntryStatus.ADDED, snapshot_id=self._snapshot_id, data_file=entry.data_file ) ) return self def delete(self, entry: ManifestEntry) -> ManifestWriter: self.add_entry( ManifestEntry.from_args( status=ManifestEntryStatus.DELETED, snapshot_id=self._snapshot_id, sequence_number=entry.sequence_number, file_sequence_number=entry.file_sequence_number, data_file=entry.data_file, ) ) return self def existing(self, entry: ManifestEntry) -> ManifestWriter: self.add_entry( ManifestEntry.from_args( status=ManifestEntryStatus.EXISTING, snapshot_id=entry.snapshot_id, sequence_number=entry.sequence_number, file_sequence_number=entry.file_sequence_number, data_file=entry.data_file, ) ) return self class ManifestWriterV1(ManifestWriter): def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int): super().__init__( spec, schema, output_file, snapshot_id, ) def content(self) -> ManifestContent: return ManifestContent.DATA @property def version(self) -> TableVersion: return 1 def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry: return entry class ManifestWriterV2(ManifestWriter): def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int): super().__init__(spec, schema, output_file, snapshot_id) def content(self) -> ManifestContent: return ManifestContent.DATA @property def version(self) -> TableVersion: return 2 @property def _meta(self) -> Dict[str, str]: return { **super()._meta, "content": "data", } def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry: if entry.sequence_number is None: if entry.snapshot_id is not None and entry.snapshot_id != self._snapshot_id: raise ValueError(f"Found unassigned sequence number for an entry from snapshot: {entry.snapshot_id}") if entry.status != ManifestEntryStatus.ADDED: raise ValueError("Only entries with status ADDED can have null sequence number") return entry def write_manifest( format_version: TableVersion, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int ) -> ManifestWriter: if format_version == 1: return ManifestWriterV1(spec, schema, output_file, snapshot_id) elif format_version == 2: return ManifestWriterV2(spec, schema, output_file, snapshot_id) else: raise ValueError(f"Cannot write manifest for table version: {format_version}") class ManifestListWriter(ABC): _format_version: TableVersion _output_file: OutputFile _meta: Dict[str, str] _manifest_files: List[ManifestFile] _commit_snapshot_id: int _writer: AvroOutputFile[ManifestFile] def __init__(self, format_version: TableVersion, output_file: OutputFile, meta: Dict[str, Any]): self._format_version = format_version self._output_file = output_file self._meta = meta self._manifest_files = [] def __enter__(self) -> ManifestListWriter: """Open the writer for writing.""" self._writer = AvroOutputFile[ManifestFile]( output_file=self._output_file, record_schema=MANIFEST_LIST_FILE_SCHEMAS[DEFAULT_READ_VERSION], file_schema=MANIFEST_LIST_FILE_SCHEMAS[self._format_version], schema_name="manifest_file", metadata=self._meta, ) self._writer.__enter__() return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException], traceback: Optional[TracebackType], ) -> None: """Close the writer.""" self._writer.__exit__(exc_type, exc_value, traceback) return @abstractmethod def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: ... def add_manifests(self, manifest_files: List[ManifestFile]) -> ManifestListWriter: self._writer.write_block([self.prepare_manifest(manifest_file) for manifest_file in manifest_files]) return self class ManifestListWriterV1(ManifestListWriter): def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int]): super().__init__( format_version=1, output_file=output_file, meta={ "snapshot-id": str(snapshot_id), "parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null", "format-version": "1", }, ) def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: if manifest_file.content != ManifestContent.DATA: raise ValidationError("Cannot store delete manifests in a v1 table") return manifest_file class ManifestListWriterV2(ManifestListWriter): _commit_snapshot_id: int _sequence_number: int def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int): super().__init__( format_version=2, output_file=output_file, meta={ "snapshot-id": str(snapshot_id), "parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null", "sequence-number": str(sequence_number), "format-version": "2", }, ) self._commit_snapshot_id = snapshot_id self._sequence_number = sequence_number def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: wrapped_manifest_file = copy(manifest_file) if wrapped_manifest_file.sequence_number == UNASSIGNED_SEQ: # if the sequence number is being assigned here, then the manifest must be created by the current operation. # To validate this, check that the snapshot id matches the current commit if self._commit_snapshot_id != wrapped_manifest_file.added_snapshot_id: raise ValueError( f"Found unassigned sequence number for a manifest from snapshot: {self._commit_snapshot_id} != {wrapped_manifest_file.added_snapshot_id}" ) wrapped_manifest_file.sequence_number = self._sequence_number if wrapped_manifest_file.min_sequence_number == UNASSIGNED_SEQ: if self._commit_snapshot_id != wrapped_manifest_file.added_snapshot_id: raise ValueError( f"Found unassigned sequence number for a manifest from snapshot: {wrapped_manifest_file.added_snapshot_id}" ) # if the min sequence number is not determined, then there was no assigned sequence number for any file # written to the wrapped manifest. Replace the unassigned sequence number with the one for this commit wrapped_manifest_file.min_sequence_number = self._sequence_number return wrapped_manifest_file def write_manifest_list( format_version: TableVersion, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: Optional[int], ) -> ManifestListWriter: if format_version == 1: return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id) elif format_version == 2: if sequence_number is None: raise ValueError(f"Sequence-number is required for V2 tables: {sequence_number}") return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number) else: raise ValueError(f"Cannot write manifest list for table version: {format_version}")