pyiceberg/manifest.py (887 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import math
from abc import ABC, abstractmethod
from copy import copy
from enum import Enum
from types import TracebackType
from typing import (
Any,
Dict,
Iterator,
List,
Literal,
Optional,
Tuple,
Type,
Union,
)
from cachetools import LRUCache, cached
from cachetools.keys import hashkey
from pydantic_core import to_json
from pyiceberg.avro.file import AvroFile, AvroOutputFile
from pyiceberg.conversions import to_bytes
from pyiceberg.exceptions import ValidationError
from pyiceberg.io import FileIO, InputFile, OutputFile
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.typedef import Record, TableVersion
from pyiceberg.types import (
BinaryType,
BooleanType,
IntegerType,
ListType,
LongType,
MapType,
NestedField,
PrimitiveType,
StringType,
StructType,
)
UNASSIGNED_SEQ = -1
DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024
DEFAULT_READ_VERSION: Literal[2] = 2
INITIAL_SEQUENCE_NUMBER = 0
class DataFileContent(int, Enum):
DATA = 0
POSITION_DELETES = 1
EQUALITY_DELETES = 2
def __repr__(self) -> str:
"""Return the string representation of the DataFileContent class."""
return f"DataFileContent.{self.name}"
class ManifestContent(int, Enum):
DATA = 0
DELETES = 1
def __repr__(self) -> str:
"""Return the string representation of the ManifestContent class."""
return f"ManifestContent.{self.name}"
class ManifestEntryStatus(int, Enum):
EXISTING = 0
ADDED = 1
DELETED = 2
def __repr__(self) -> str:
"""Return the string representation of the ManifestEntryStatus class."""
return f"ManifestEntryStatus.{self.name}"
class FileFormat(str, Enum):
AVRO = "AVRO"
PARQUET = "PARQUET"
ORC = "ORC"
PUFFIN = "PUFFIN"
@classmethod
def _missing_(cls, value: object) -> Union[None, str]:
for member in cls:
if member.value == str(value).upper():
return member
return None
def __repr__(self) -> str:
"""Return the string representation of the FileFormat class."""
return f"FileFormat.{self.name}"
DATA_FILE_TYPE: Dict[int, StructType] = {
1: StructType(
NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"),
NestedField(
field_id=101,
name="file_format",
field_type=StringType(),
required=True,
doc="File format name: avro, orc, or parquet",
),
NestedField(
field_id=102,
name="partition",
field_type=StructType(),
required=True,
doc="Partition data tuple, schema based on the partition spec",
),
NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"),
NestedField(
field_id=104, name="file_size_in_bytes", field_type=LongType(), required=True, doc="Total file size in bytes"
),
NestedField(
field_id=105,
name="block_size_in_bytes",
field_type=LongType(),
required=True,
doc="Deprecated. Always write a default in v1. Do not write in v2.",
write_default=DEFAULT_BLOCK_SIZE,
),
NestedField(
field_id=108,
name="column_sizes",
field_type=MapType(key_id=117, key_type=IntegerType(), value_id=118, value_type=LongType()),
required=False,
doc="Map of column id to total size on disk",
),
NestedField(
field_id=109,
name="value_counts",
field_type=MapType(key_id=119, key_type=IntegerType(), value_id=120, value_type=LongType()),
required=False,
doc="Map of column id to total count, including null and NaN",
),
NestedField(
field_id=110,
name="null_value_counts",
field_type=MapType(key_id=121, key_type=IntegerType(), value_id=122, value_type=LongType()),
required=False,
doc="Map of column id to null value count",
),
NestedField(
field_id=137,
name="nan_value_counts",
field_type=MapType(key_id=138, key_type=IntegerType(), value_id=139, value_type=LongType()),
required=False,
doc="Map of column id to number of NaN values in the column",
),
NestedField(
field_id=125,
name="lower_bounds",
field_type=MapType(key_id=126, key_type=IntegerType(), value_id=127, value_type=BinaryType()),
required=False,
doc="Map of column id to lower bound",
),
NestedField(
field_id=128,
name="upper_bounds",
field_type=MapType(key_id=129, key_type=IntegerType(), value_id=130, value_type=BinaryType()),
required=False,
doc="Map of column id to upper bound",
),
NestedField(
field_id=131, name="key_metadata", field_type=BinaryType(), required=False, doc="Encryption key metadata blob"
),
NestedField(
field_id=132,
name="split_offsets",
field_type=ListType(element_id=133, element_type=LongType(), element_required=True),
required=False,
doc="Splittable offsets",
),
NestedField(field_id=140, name="sort_order_id", field_type=IntegerType(), required=False, doc="Sort order ID"),
),
2: StructType(
NestedField(
field_id=134,
name="content",
field_type=IntegerType(),
required=True,
doc="File format name: avro, orc, or parquet",
initial_default=DataFileContent.DATA,
),
NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"),
NestedField(
field_id=101,
name="file_format",
field_type=StringType(),
required=True,
doc="File format name: avro, orc, or parquet",
),
NestedField(
field_id=102,
name="partition",
field_type=StructType(),
required=True,
doc="Partition data tuple, schema based on the partition spec",
),
NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"),
NestedField(
field_id=104, name="file_size_in_bytes", field_type=LongType(), required=True, doc="Total file size in bytes"
),
NestedField(
field_id=108,
name="column_sizes",
field_type=MapType(key_id=117, key_type=IntegerType(), value_id=118, value_type=LongType()),
required=False,
doc="Map of column id to total size on disk",
),
NestedField(
field_id=109,
name="value_counts",
field_type=MapType(key_id=119, key_type=IntegerType(), value_id=120, value_type=LongType()),
required=False,
doc="Map of column id to total count, including null and NaN",
),
NestedField(
field_id=110,
name="null_value_counts",
field_type=MapType(key_id=121, key_type=IntegerType(), value_id=122, value_type=LongType()),
required=False,
doc="Map of column id to null value count",
),
NestedField(
field_id=137,
name="nan_value_counts",
field_type=MapType(key_id=138, key_type=IntegerType(), value_id=139, value_type=LongType()),
required=False,
doc="Map of column id to number of NaN values in the column",
),
NestedField(
field_id=125,
name="lower_bounds",
field_type=MapType(key_id=126, key_type=IntegerType(), value_id=127, value_type=BinaryType()),
required=False,
doc="Map of column id to lower bound",
),
NestedField(
field_id=128,
name="upper_bounds",
field_type=MapType(key_id=129, key_type=IntegerType(), value_id=130, value_type=BinaryType()),
required=False,
doc="Map of column id to upper bound",
),
NestedField(
field_id=131, name="key_metadata", field_type=BinaryType(), required=False, doc="Encryption key metadata blob"
),
NestedField(
field_id=132,
name="split_offsets",
field_type=ListType(element_id=133, element_type=LongType(), element_required=True),
required=False,
doc="Splittable offsets",
),
NestedField(
field_id=135,
name="equality_ids",
field_type=ListType(element_id=136, element_type=LongType(), element_required=True),
required=False,
doc="Field ids used to determine row equality in equality delete files.",
),
NestedField(
field_id=140,
name="sort_order_id",
field_type=IntegerType(),
required=False,
doc="ID representing sort order for this file",
),
),
}
def data_file_with_partition(partition_type: StructType, format_version: TableVersion) -> StructType:
data_file_partition_type = StructType(
*[
NestedField(
field_id=field.field_id,
name=field.name,
field_type=field.field_type,
required=field.required,
)
for field in partition_type.fields
]
)
return StructType(
*[
NestedField(
field_id=102,
name="partition",
field_type=data_file_partition_type,
required=True,
doc="Partition data tuple, schema based on the partition spec",
)
if field.field_id == 102
else field
for field in DATA_FILE_TYPE[format_version].fields
]
)
class DataFile(Record):
@classmethod
def from_args(cls, _table_format_version: TableVersion = DEFAULT_READ_VERSION, **arguments: Any) -> DataFile:
struct = DATA_FILE_TYPE[_table_format_version]
return super()._bind(struct, **arguments)
@property
def content(self) -> DataFileContent:
return self._data[0]
@property
def file_path(self) -> str:
return self._data[1]
@property
def file_format(self) -> FileFormat:
return self._data[2]
@property
def partition(self) -> Record:
return self._data[3]
@property
def record_count(self) -> int:
return self._data[4]
@property
def file_size_in_bytes(self) -> int:
return self._data[5]
@property
def column_sizes(self) -> Dict[int, int]:
return self._data[6]
@property
def value_counts(self) -> Dict[int, int]:
return self._data[7]
@property
def null_value_counts(self) -> Dict[int, int]:
return self._data[8]
@property
def nan_value_counts(self) -> Dict[int, int]:
return self._data[9]
@property
def lower_bounds(self) -> Dict[int, bytes]:
return self._data[10]
@property
def upper_bounds(self) -> Dict[int, bytes]:
return self._data[11]
@property
def key_metadata(self) -> Optional[bytes]:
return self._data[12]
@property
def split_offsets(self) -> Optional[List[int]]:
return self._data[13]
@property
def equality_ids(self) -> Optional[List[int]]:
return self._data[14]
@property
def sort_order_id(self) -> Optional[int]:
return self._data[15]
# Spec ID should not be stored in the file
_spec_id: int
@property
def spec_id(self) -> int:
return self._spec_id
@spec_id.setter
def spec_id(self, value: int) -> None:
self._spec_id = value
def __setattr__(self, name: str, value: Any) -> None:
"""Assign a key/value to a DataFile."""
# The file_format is written as a string, so we need to cast it to the Enum
if name == "file_format":
value = FileFormat[value]
super().__setattr__(name, value)
def __hash__(self) -> int:
"""Return the hash of the file path."""
return hash(self.file_path)
def __eq__(self, other: Any) -> bool:
"""Compare the datafile with another object.
If it is a datafile, it will compare based on the file_path.
"""
return self.file_path == other.file_path if isinstance(other, DataFile) else False
MANIFEST_ENTRY_SCHEMAS = {
1: Schema(
NestedField(0, "status", IntegerType(), required=True),
NestedField(1, "snapshot_id", LongType(), required=True),
NestedField(2, "data_file", DATA_FILE_TYPE[1], required=True),
),
2: Schema(
NestedField(0, "status", IntegerType(), required=True),
NestedField(1, "snapshot_id", LongType(), required=False),
NestedField(3, "sequence_number", LongType(), required=False),
NestedField(4, "file_sequence_number", LongType(), required=False),
NestedField(2, "data_file", DATA_FILE_TYPE[2], required=True),
),
}
MANIFEST_ENTRY_SCHEMAS_STRUCT = {format_version: schema.as_struct() for format_version, schema in MANIFEST_ENTRY_SCHEMAS.items()}
def manifest_entry_schema_with_data_file(format_version: TableVersion, data_file: StructType) -> Schema:
return Schema(
*[
NestedField(2, "data_file", data_file, required=True) if field.field_id == 2 else field
for field in MANIFEST_ENTRY_SCHEMAS[format_version].fields
]
)
class ManifestEntry(Record):
@classmethod
def from_args(cls, _table_format_version: TableVersion = DEFAULT_READ_VERSION, **arguments: Any) -> ManifestEntry:
return super()._bind(**arguments, struct=MANIFEST_ENTRY_SCHEMAS_STRUCT[_table_format_version])
@property
def status(self) -> ManifestEntryStatus:
return self._data[0]
@status.setter
def status(self, value: ManifestEntryStatus) -> None:
self._data[0] = value
@property
def snapshot_id(self) -> Optional[int]:
return self._data[1]
@snapshot_id.setter
def snapshot_id(self, value: int) -> None:
self._data[0] = value
@property
def sequence_number(self) -> Optional[int]:
return self._data[2]
@sequence_number.setter
def sequence_number(self, value: int) -> None:
self._data[2] = value
@property
def file_sequence_number(self) -> Optional[int]:
return self._data[3]
@file_sequence_number.setter
def file_sequence_number(self, value: int) -> None:
self._data[3] = value
@property
def data_file(self) -> DataFile:
return self._data[4]
@data_file.setter
def data_file(self, value: DataFile) -> None:
self._data[4] = value
PARTITION_FIELD_SUMMARY_TYPE = StructType(
NestedField(509, "contains_null", BooleanType(), required=True),
NestedField(518, "contains_nan", BooleanType(), required=False),
NestedField(510, "lower_bound", BinaryType(), required=False),
NestedField(511, "upper_bound", BinaryType(), required=False),
)
class PartitionFieldSummary(Record):
@classmethod
def from_args(cls, **arguments: Any) -> PartitionFieldSummary:
return super()._bind(**arguments, struct=PARTITION_FIELD_SUMMARY_TYPE)
@property
def contains_null(self) -> bool:
return self._data[0]
@property
def contains_nan(self) -> Optional[bool]:
return self._data[1]
@property
def lower_bound(self) -> Optional[bytes]:
return self._data[2]
@property
def upper_bound(self) -> Optional[bytes]:
return self._data[3]
class PartitionFieldStats:
_type: PrimitiveType
_contains_null: bool
_contains_nan: bool
_min: Optional[Any]
_max: Optional[Any]
def __init__(self, iceberg_type: PrimitiveType) -> None:
self._type = iceberg_type
self._contains_null = False
self._contains_nan = False
self._min = None
self._max = None
def to_summary(self) -> PartitionFieldSummary:
return PartitionFieldSummary(
self._contains_null,
self._contains_nan,
to_bytes(self._type, self._min) if self._min is not None else None,
to_bytes(self._type, self._max) if self._max is not None else None,
)
def update(self, value: Any) -> None:
if value is None:
self._contains_null = True
elif isinstance(value, float) and math.isnan(value):
self._contains_nan = True
else:
if self._min is None:
self._min = value
self._max = value
else:
self._max = max(self._max, value)
self._min = min(self._min, value)
def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partitions: List[Record]) -> List[PartitionFieldSummary]:
types = [field.field_type for field in spec.partition_type(schema).fields]
field_stats = [PartitionFieldStats(field_type) for field_type in types]
for partition_keys in partitions:
for i, field_type in enumerate(types):
if not isinstance(field_type, PrimitiveType):
raise ValueError(f"Expected a primitive type for the partition field, got {field_type}")
partition_key = partition_keys[i]
field_stats[i].update(partition_key)
return [field.to_summary() for field in field_stats]
MANIFEST_LIST_FILE_SCHEMAS: Dict[int, Schema] = {
1: Schema(
NestedField(500, "manifest_path", StringType(), required=True, doc="Location URI with FS scheme"),
NestedField(501, "manifest_length", LongType(), required=True),
NestedField(502, "partition_spec_id", IntegerType(), required=True),
NestedField(503, "added_snapshot_id", LongType(), required=True),
NestedField(504, "added_files_count", IntegerType(), required=False),
NestedField(505, "existing_files_count", IntegerType(), required=False),
NestedField(506, "deleted_files_count", IntegerType(), required=False),
NestedField(512, "added_rows_count", LongType(), required=False),
NestedField(513, "existing_rows_count", LongType(), required=False),
NestedField(514, "deleted_rows_count", LongType(), required=False),
NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False),
NestedField(519, "key_metadata", BinaryType(), required=False),
),
2: Schema(
NestedField(500, "manifest_path", StringType(), required=True, doc="Location URI with FS scheme"),
NestedField(501, "manifest_length", LongType(), required=True),
NestedField(502, "partition_spec_id", IntegerType(), required=True),
NestedField(517, "content", IntegerType(), required=True, initial_default=ManifestContent.DATA),
NestedField(515, "sequence_number", LongType(), required=True, initial_default=0),
NestedField(516, "min_sequence_number", LongType(), required=True, initial_default=0),
NestedField(503, "added_snapshot_id", LongType(), required=True),
NestedField(504, "added_files_count", IntegerType(), required=True),
NestedField(505, "existing_files_count", IntegerType(), required=True),
NestedField(506, "deleted_files_count", IntegerType(), required=True),
NestedField(512, "added_rows_count", LongType(), required=True),
NestedField(513, "existing_rows_count", LongType(), required=True),
NestedField(514, "deleted_rows_count", LongType(), required=True),
NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False),
NestedField(519, "key_metadata", BinaryType(), required=False),
),
}
MANIFEST_LIST_FILE_STRUCTS = {format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()}
POSITIONAL_DELETE_SCHEMA = Schema(
NestedField(2147483546, "file_path", StringType()), NestedField(2147483545, "pos", IntegerType())
)
class ManifestFile(Record):
@classmethod
def from_args(cls, _table_format_version: TableVersion = DEFAULT_READ_VERSION, **arguments: Any) -> ManifestFile:
return super()._bind(**arguments, struct=MANIFEST_LIST_FILE_SCHEMAS[_table_format_version])
@property
def manifest_path(self) -> str:
return self._data[0]
@property
def manifest_length(self) -> int:
return self._data[1]
@property
def partition_spec_id(self) -> int:
return self._data[2]
@property
def content(self) -> ManifestContent:
return self._data[3]
@property
def sequence_number(self) -> int:
return self._data[4]
@sequence_number.setter
def sequence_number(self, value: int) -> None:
self._data[4] = value
@property
def min_sequence_number(self) -> int:
return self._data[5]
@min_sequence_number.setter
def min_sequence_number(self, value: int) -> None:
self._data[5] = value
@property
def added_snapshot_id(self) -> Optional[int]:
return self._data[6]
@property
def added_files_count(self) -> Optional[int]:
return self._data[7]
@property
def existing_files_count(self) -> Optional[int]:
return self._data[8]
@property
def deleted_files_count(self) -> Optional[int]:
return self._data[9]
@property
def added_rows_count(self) -> Optional[int]:
return self._data[10]
@property
def existing_rows_count(self) -> Optional[int]:
return self._data[11]
@property
def deleted_rows_count(self) -> Optional[int]:
return self._data[12]
@property
def partitions(self) -> Optional[List[PartitionFieldSummary]]:
return self._data[13]
@property
def key_metadata(self) -> Optional[bytes]:
return self._data[14]
def has_added_files(self) -> bool:
return self.added_files_count is None or self.added_files_count > 0
def has_existing_files(self) -> bool:
return self.existing_files_count is None or self.existing_files_count > 0
def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List[ManifestEntry]:
"""
Read the manifest entries from the manifest file.
Args:
io: The FileIO to fetch the file.
discard_deleted: Filter on live entries.
Returns:
An Iterator of manifest entries.
"""
input_file = io.new_input(self.manifest_path)
with AvroFile[ManifestEntry](
input_file,
MANIFEST_ENTRY_SCHEMAS[DEFAULT_READ_VERSION],
read_types={-1: ManifestEntry, 2: DataFile},
read_enums={0: ManifestEntryStatus, 101: FileFormat, 134: DataFileContent},
) as reader:
return [
_inherit_from_manifest(entry, self)
for entry in reader
if not discard_deleted or entry.status != ManifestEntryStatus.DELETED
]
@cached(cache=LRUCache(maxsize=128), key=lambda io, manifest_list: hashkey(manifest_list))
def _manifests(io: FileIO, manifest_list: str) -> Tuple[ManifestFile, ...]:
"""Read and cache manifests from the given manifest list, returning a tuple to prevent modification."""
file = io.new_input(manifest_list)
return tuple(read_manifest_list(file))
def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]:
"""
Read the manifests from the manifest list.
Args:
input_file: The input file where the stream can be read from.
Returns:
An iterator of ManifestFiles that are part of the list.
"""
with AvroFile[ManifestFile](
input_file,
MANIFEST_LIST_FILE_SCHEMAS[DEFAULT_READ_VERSION],
read_types={-1: ManifestFile, 508: PartitionFieldSummary},
read_enums={517: ManifestContent},
) as reader:
yield from reader
def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> ManifestEntry:
"""
Inherits properties from manifest file.
The properties that will be inherited are:
- sequence numbers
- partition spec id.
More information about inheriting sequence numbers: https://iceberg.apache.org/spec/#sequence-number-inheritance
Args:
entry: The manifest entry.
manifest: The manifest file.
Returns:
The manifest entry with properties inherited.
"""
# Inherit sequence numbers.
# The snapshot_id is required in V1, inherit with V2 when null
if entry.snapshot_id is None:
entry.snapshot_id = manifest.added_snapshot_id
# in v1 tables, the sequence number is not persisted and can be safely defaulted to 0
# in v2 tables, the sequence number should be inherited iff the entry status is ADDED
if entry.sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
entry.sequence_number = manifest.sequence_number
# in v1 tables, the file sequence number is not persisted and can be safely defaulted to 0
# in v2 tables, the file sequence number should be inherited iff the entry status is ADDED
if entry.file_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
# Only available in V2, always 0 in V1
entry.file_sequence_number = manifest.sequence_number
# Inherit partition spec id.
entry.data_file.spec_id = manifest.partition_spec_id
return entry
class ManifestWriter(ABC):
closed: bool
_spec: PartitionSpec
_schema: Schema
_output_file: OutputFile
_writer: AvroOutputFile[ManifestEntry]
_snapshot_id: int
_added_files: int
_added_rows: int
_existing_files: int
_existing_rows: int
_deleted_files: int
_deleted_rows: int
_min_sequence_number: Optional[int]
_partitions: List[Record]
_reused_entry_wrapper: ManifestEntry
def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int) -> None:
self.closed = False
self._spec = spec
self._schema = schema
self._output_file = output_file
self._snapshot_id = snapshot_id
self._added_files = 0
self._added_rows = 0
self._existing_files = 0
self._existing_rows = 0
self._deleted_files = 0
self._deleted_rows = 0
self._min_sequence_number = None
self._partitions = []
def __enter__(self) -> ManifestWriter:
"""Open the writer."""
self._writer = self.new_writer()
self._writer.__enter__()
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
"""Close the writer."""
if (self._added_files + self._existing_files + self._deleted_files) == 0:
# This is just a guard to ensure that we don't write empty manifest files
raise ValueError("An empty manifest file has been written")
self.closed = True
self._writer.__exit__(exc_type, exc_value, traceback)
@abstractmethod
def content(self) -> ManifestContent: ...
@property
@abstractmethod
def version(self) -> TableVersion: ...
@property
def _meta(self) -> Dict[str, str]:
return {
"schema": self._schema.model_dump_json(),
"partition-spec": to_json(self._spec.fields).decode("utf-8"),
"partition-spec-id": str(self._spec.spec_id),
"format-version": str(self.version),
}
def _with_partition(self, format_version: TableVersion) -> Schema:
data_file_type = data_file_with_partition(
format_version=format_version, partition_type=self._spec.partition_type(self._schema)
)
return manifest_entry_schema_with_data_file(format_version=format_version, data_file=data_file_type)
def new_writer(self) -> AvroOutputFile[ManifestEntry]:
return AvroOutputFile[ManifestEntry](
output_file=self._output_file,
file_schema=self._with_partition(self.version),
record_schema=self._with_partition(DEFAULT_READ_VERSION),
schema_name="manifest_entry",
metadata=self._meta,
)
@abstractmethod
def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry: ...
def to_manifest_file(self) -> ManifestFile:
"""Return the manifest file."""
# once the manifest file is generated, no more entries can be added
self.closed = True
min_sequence_number = self._min_sequence_number or UNASSIGNED_SEQ
return ManifestFile.from_args(
manifest_path=self._output_file.location,
manifest_length=len(self._writer.output_file),
partition_spec_id=self._spec.spec_id,
content=self.content(),
sequence_number=UNASSIGNED_SEQ,
min_sequence_number=min_sequence_number,
added_snapshot_id=self._snapshot_id,
added_files_count=self._added_files,
existing_files_count=self._existing_files,
deleted_files_count=self._deleted_files,
added_rows_count=self._added_rows,
existing_rows_count=self._existing_rows,
deleted_rows_count=self._deleted_rows,
partitions=construct_partition_summaries(self._spec, self._schema, self._partitions),
key_metadata=None,
)
def add_entry(self, entry: ManifestEntry) -> ManifestWriter:
if self.closed:
raise RuntimeError("Cannot add entry to closed manifest writer")
if entry.status == ManifestEntryStatus.ADDED:
self._added_files += 1
self._added_rows += entry.data_file.record_count
elif entry.status == ManifestEntryStatus.EXISTING:
self._existing_files += 1
self._existing_rows += entry.data_file.record_count
elif entry.status == ManifestEntryStatus.DELETED:
self._deleted_files += 1
self._deleted_rows += entry.data_file.record_count
else:
raise ValueError(f"Unknown entry: {entry.status}")
self._partitions.append(entry.data_file.partition)
if (
(entry.status == ManifestEntryStatus.ADDED or entry.status == ManifestEntryStatus.EXISTING)
and entry.sequence_number is not None
and (self._min_sequence_number is None or entry.sequence_number < self._min_sequence_number)
):
self._min_sequence_number = entry.sequence_number
self._writer.write_block([self.prepare_entry(entry)])
return self
def add(self, entry: ManifestEntry) -> ManifestWriter:
if entry.sequence_number is not None and entry.sequence_number >= 0:
self.add_entry(
ManifestEntry.from_args(
snapshot_id=self._snapshot_id, sequence_number=entry.sequence_number, data_file=entry.data_file
)
)
else:
self.add_entry(
ManifestEntry.from_args(
status=ManifestEntryStatus.ADDED, snapshot_id=self._snapshot_id, data_file=entry.data_file
)
)
return self
def delete(self, entry: ManifestEntry) -> ManifestWriter:
self.add_entry(
ManifestEntry.from_args(
status=ManifestEntryStatus.DELETED,
snapshot_id=self._snapshot_id,
sequence_number=entry.sequence_number,
file_sequence_number=entry.file_sequence_number,
data_file=entry.data_file,
)
)
return self
def existing(self, entry: ManifestEntry) -> ManifestWriter:
self.add_entry(
ManifestEntry.from_args(
status=ManifestEntryStatus.EXISTING,
snapshot_id=entry.snapshot_id,
sequence_number=entry.sequence_number,
file_sequence_number=entry.file_sequence_number,
data_file=entry.data_file,
)
)
return self
class ManifestWriterV1(ManifestWriter):
def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int):
super().__init__(
spec,
schema,
output_file,
snapshot_id,
)
def content(self) -> ManifestContent:
return ManifestContent.DATA
@property
def version(self) -> TableVersion:
return 1
def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
return entry
class ManifestWriterV2(ManifestWriter):
def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int):
super().__init__(spec, schema, output_file, snapshot_id)
def content(self) -> ManifestContent:
return ManifestContent.DATA
@property
def version(self) -> TableVersion:
return 2
@property
def _meta(self) -> Dict[str, str]:
return {
**super()._meta,
"content": "data",
}
def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
if entry.sequence_number is None:
if entry.snapshot_id is not None and entry.snapshot_id != self._snapshot_id:
raise ValueError(f"Found unassigned sequence number for an entry from snapshot: {entry.snapshot_id}")
if entry.status != ManifestEntryStatus.ADDED:
raise ValueError("Only entries with status ADDED can have null sequence number")
return entry
def write_manifest(
format_version: TableVersion, spec: PartitionSpec, schema: Schema, output_file: OutputFile, snapshot_id: int
) -> ManifestWriter:
if format_version == 1:
return ManifestWriterV1(spec, schema, output_file, snapshot_id)
elif format_version == 2:
return ManifestWriterV2(spec, schema, output_file, snapshot_id)
else:
raise ValueError(f"Cannot write manifest for table version: {format_version}")
class ManifestListWriter(ABC):
_format_version: TableVersion
_output_file: OutputFile
_meta: Dict[str, str]
_manifest_files: List[ManifestFile]
_commit_snapshot_id: int
_writer: AvroOutputFile[ManifestFile]
def __init__(self, format_version: TableVersion, output_file: OutputFile, meta: Dict[str, Any]):
self._format_version = format_version
self._output_file = output_file
self._meta = meta
self._manifest_files = []
def __enter__(self) -> ManifestListWriter:
"""Open the writer for writing."""
self._writer = AvroOutputFile[ManifestFile](
output_file=self._output_file,
record_schema=MANIFEST_LIST_FILE_SCHEMAS[DEFAULT_READ_VERSION],
file_schema=MANIFEST_LIST_FILE_SCHEMAS[self._format_version],
schema_name="manifest_file",
metadata=self._meta,
)
self._writer.__enter__()
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
"""Close the writer."""
self._writer.__exit__(exc_type, exc_value, traceback)
return
@abstractmethod
def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: ...
def add_manifests(self, manifest_files: List[ManifestFile]) -> ManifestListWriter:
self._writer.write_block([self.prepare_manifest(manifest_file) for manifest_file in manifest_files])
return self
class ManifestListWriterV1(ManifestListWriter):
def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int]):
super().__init__(
format_version=1,
output_file=output_file,
meta={
"snapshot-id": str(snapshot_id),
"parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null",
"format-version": "1",
},
)
def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
if manifest_file.content != ManifestContent.DATA:
raise ValidationError("Cannot store delete manifests in a v1 table")
return manifest_file
class ManifestListWriterV2(ManifestListWriter):
_commit_snapshot_id: int
_sequence_number: int
def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id: Optional[int], sequence_number: int):
super().__init__(
format_version=2,
output_file=output_file,
meta={
"snapshot-id": str(snapshot_id),
"parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null",
"sequence-number": str(sequence_number),
"format-version": "2",
},
)
self._commit_snapshot_id = snapshot_id
self._sequence_number = sequence_number
def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
wrapped_manifest_file = copy(manifest_file)
if wrapped_manifest_file.sequence_number == UNASSIGNED_SEQ:
# if the sequence number is being assigned here, then the manifest must be created by the current operation.
# To validate this, check that the snapshot id matches the current commit
if self._commit_snapshot_id != wrapped_manifest_file.added_snapshot_id:
raise ValueError(
f"Found unassigned sequence number for a manifest from snapshot: {self._commit_snapshot_id} != {wrapped_manifest_file.added_snapshot_id}"
)
wrapped_manifest_file.sequence_number = self._sequence_number
if wrapped_manifest_file.min_sequence_number == UNASSIGNED_SEQ:
if self._commit_snapshot_id != wrapped_manifest_file.added_snapshot_id:
raise ValueError(
f"Found unassigned sequence number for a manifest from snapshot: {wrapped_manifest_file.added_snapshot_id}"
)
# if the min sequence number is not determined, then there was no assigned sequence number for any file
# written to the wrapped manifest. Replace the unassigned sequence number with the one for this commit
wrapped_manifest_file.min_sequence_number = self._sequence_number
return wrapped_manifest_file
def write_manifest_list(
format_version: TableVersion,
output_file: OutputFile,
snapshot_id: int,
parent_snapshot_id: Optional[int],
sequence_number: Optional[int],
) -> ManifestListWriter:
if format_version == 1:
return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id)
elif format_version == 2:
if sequence_number is None:
raise ValueError(f"Sequence-number is required for V2 tables: {sequence_number}")
return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number)
else:
raise ValueError(f"Cannot write manifest list for table version: {format_version}")