pyiceberg/typedef.py (94 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from abc import abstractmethod
from datetime import date, datetime, time
from decimal import Decimal
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generic,
List,
Literal,
Optional,
Protocol,
Set,
Tuple,
TypeVar,
Union,
runtime_checkable,
)
from uuid import UUID
from pydantic import BaseModel, ConfigDict, RootModel
from typing_extensions import Self, TypeAlias
if TYPE_CHECKING:
from pyiceberg.types import StructType
class FrozenDict(Dict[Any, Any]):
def __setitem__(self, instance: Any, value: Any) -> None:
"""Assign a value to a FrozenDict."""
raise AttributeError("FrozenDict does not support assignment")
def update(self, *args: Any, **kwargs: Any) -> None:
raise AttributeError("FrozenDict does not support .update()")
UTF8 = "utf-8"
EMPTY_DICT = FrozenDict()
K = TypeVar("K")
V = TypeVar("V")
# from https://stackoverflow.com/questions/2912231/is-there-a-clever-way-to-pass-the-key-to-defaultdicts-default-factory
class KeyDefaultDict(Dict[K, V]):
def __init__(self, default_factory: Callable[[K], V]):
super().__init__()
self.default_factory = default_factory
def __missing__(self, key: K) -> V:
"""Define behavior if you access a non-existent key in a KeyDefaultDict."""
val = self.default_factory(key)
self[key] = val
return val
Identifier = Tuple[str, ...]
"""A tuple of strings representing a table identifier.
Each string in the tuple represents a part of the table's unique path. For example,
a table in a namespace might be identified as:
("namespace", "table_name")
Examples:
>>> identifier: Identifier = ("namespace", "table_name")
"""
Properties = Dict[str, Any]
"""A dictionary type for properties in PyIceberg."""
RecursiveDict = Dict[str, Union[str, "RecursiveDict"]]
"""A recursive dictionary type for nested structures in PyIceberg."""
# Represents the literal value
L = TypeVar("L", str, bool, int, float, bytes, UUID, Decimal, datetime, date, time, covariant=True)
@runtime_checkable
class StructProtocol(Protocol): # pragma: no cover
"""A generic protocol used by accessors to get and set at positions of an object."""
@abstractmethod
def __getitem__(self, pos: int) -> Any:
"""Fetch a value from a StructProtocol."""
@abstractmethod
def __setitem__(self, pos: int, value: Any) -> None:
"""Assign a value to a StructProtocol."""
class IcebergBaseModel(BaseModel):
"""
This class extends the Pydantic BaseModel to set default values by overriding them.
This is because we always want to set by_alias to True. In Python, the dash can't
be used in variable names, and this is used throughout the Iceberg spec.
The same goes for exclude_none, if a field is None we want to omit it from
serialization, for example, the doc attribute on the NestedField object.
Default non-null values will be serialized.
This is recommended by Pydantic:
https://pydantic-docs.helpmanual.io/usage/model_config/#change-behaviour-globally
"""
model_config = ConfigDict(populate_by_name=True, frozen=True)
def _exclude_private_properties(self, exclude: Optional[Set[str]] = None) -> Set[str]:
# A small trick to exclude private properties. Properties are serialized by pydantic,
# regardless if they start with an underscore.
# This will look at the dict, and find the fields and exclude them
return set.union(
{field for field in self.__dict__ if field.startswith("_") and not field == "__root__"}, exclude or set()
)
def model_dump(
self, exclude_none: bool = True, exclude: Optional[Set[str]] = None, by_alias: bool = True, **kwargs: Any
) -> Dict[str, Any]:
return super().model_dump(
exclude_none=exclude_none, exclude=self._exclude_private_properties(exclude), by_alias=by_alias, **kwargs
)
def model_dump_json(
self, exclude_none: bool = True, exclude: Optional[Set[str]] = None, by_alias: bool = True, **kwargs: Any
) -> str:
return super().model_dump_json(
exclude_none=exclude_none, exclude=self._exclude_private_properties(exclude), by_alias=by_alias, **kwargs
)
T = TypeVar("T")
class IcebergRootModel(RootModel[T], Generic[T]):
"""
This class extends the Pydantic BaseModel to set default values by overriding them.
This is because we always want to set by_alias to True. In Python, the dash can't
be used in variable names, and this is used throughout the Iceberg spec.
The same goes for exclude_none, if a field is None we want to omit it from
serialization, for example, the doc attribute on the NestedField object.
Default non-null values will be serialized.
This is recommended by Pydantic:
https://pydantic-docs.helpmanual.io/usage/model_config/#change-behaviour-globally
"""
model_config = ConfigDict(frozen=True)
class Record(StructProtocol):
__slots__ = ("_data",)
_data: List[Any]
@classmethod
def _bind(cls, struct: StructType, **arguments: Any) -> Self:
return cls(*[arguments[field.name] if field.name in arguments else field.initial_default for field in struct.fields])
def __init__(self, *data: Any) -> None:
self._data = list(data)
def __setitem__(self, pos: int, value: Any) -> None:
"""Assign a value to a Record."""
self._data[pos] = value
def __getitem__(self, pos: int) -> Any:
"""Fetch a value from a Record."""
return self._data[pos]
def __eq__(self, other: Any) -> bool:
"""Return the equality of two instances of the Record class."""
return self._data == other._data if isinstance(other, Record) else False
def __repr__(self) -> str:
"""Return the string representation of the Record class."""
return f"{self.__class__.__name__}[{', '.join(str(v) for v in self._data)}]"
def __len__(self) -> int:
"""Return the number of fields in the Record class."""
return len(self._data)
def __hash__(self) -> int:
"""Return hash value of the Record class."""
return hash(str(self))
TableVersion: TypeAlias = Literal[1, 2, 3]