pyiceberg/typedef.py (94 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from __future__ import annotations from abc import abstractmethod from datetime import date, datetime, time from decimal import Decimal from typing import ( TYPE_CHECKING, Any, Callable, Dict, Generic, List, Literal, Optional, Protocol, Set, Tuple, TypeVar, Union, runtime_checkable, ) from uuid import UUID from pydantic import BaseModel, ConfigDict, RootModel from typing_extensions import Self, TypeAlias if TYPE_CHECKING: from pyiceberg.types import StructType class FrozenDict(Dict[Any, Any]): def __setitem__(self, instance: Any, value: Any) -> None: """Assign a value to a FrozenDict.""" raise AttributeError("FrozenDict does not support assignment") def update(self, *args: Any, **kwargs: Any) -> None: raise AttributeError("FrozenDict does not support .update()") UTF8 = "utf-8" EMPTY_DICT = FrozenDict() K = TypeVar("K") V = TypeVar("V") # from https://stackoverflow.com/questions/2912231/is-there-a-clever-way-to-pass-the-key-to-defaultdicts-default-factory class KeyDefaultDict(Dict[K, V]): def __init__(self, default_factory: Callable[[K], V]): super().__init__() self.default_factory = default_factory def __missing__(self, key: K) -> V: """Define behavior if you access a non-existent key in a KeyDefaultDict.""" val = self.default_factory(key) self[key] = val return val Identifier = Tuple[str, ...] """A tuple of strings representing a table identifier. Each string in the tuple represents a part of the table's unique path. For example, a table in a namespace might be identified as: ("namespace", "table_name") Examples: >>> identifier: Identifier = ("namespace", "table_name") """ Properties = Dict[str, Any] """A dictionary type for properties in PyIceberg.""" RecursiveDict = Dict[str, Union[str, "RecursiveDict"]] """A recursive dictionary type for nested structures in PyIceberg.""" # Represents the literal value L = TypeVar("L", str, bool, int, float, bytes, UUID, Decimal, datetime, date, time, covariant=True) @runtime_checkable class StructProtocol(Protocol): # pragma: no cover """A generic protocol used by accessors to get and set at positions of an object.""" @abstractmethod def __getitem__(self, pos: int) -> Any: """Fetch a value from a StructProtocol.""" @abstractmethod def __setitem__(self, pos: int, value: Any) -> None: """Assign a value to a StructProtocol.""" class IcebergBaseModel(BaseModel): """ This class extends the Pydantic BaseModel to set default values by overriding them. This is because we always want to set by_alias to True. In Python, the dash can't be used in variable names, and this is used throughout the Iceberg spec. The same goes for exclude_none, if a field is None we want to omit it from serialization, for example, the doc attribute on the NestedField object. Default non-null values will be serialized. This is recommended by Pydantic: https://pydantic-docs.helpmanual.io/usage/model_config/#change-behaviour-globally """ model_config = ConfigDict(populate_by_name=True, frozen=True) def _exclude_private_properties(self, exclude: Optional[Set[str]] = None) -> Set[str]: # A small trick to exclude private properties. Properties are serialized by pydantic, # regardless if they start with an underscore. # This will look at the dict, and find the fields and exclude them return set.union( {field for field in self.__dict__ if field.startswith("_") and not field == "__root__"}, exclude or set() ) def model_dump( self, exclude_none: bool = True, exclude: Optional[Set[str]] = None, by_alias: bool = True, **kwargs: Any ) -> Dict[str, Any]: return super().model_dump( exclude_none=exclude_none, exclude=self._exclude_private_properties(exclude), by_alias=by_alias, **kwargs ) def model_dump_json( self, exclude_none: bool = True, exclude: Optional[Set[str]] = None, by_alias: bool = True, **kwargs: Any ) -> str: return super().model_dump_json( exclude_none=exclude_none, exclude=self._exclude_private_properties(exclude), by_alias=by_alias, **kwargs ) T = TypeVar("T") class IcebergRootModel(RootModel[T], Generic[T]): """ This class extends the Pydantic BaseModel to set default values by overriding them. This is because we always want to set by_alias to True. In Python, the dash can't be used in variable names, and this is used throughout the Iceberg spec. The same goes for exclude_none, if a field is None we want to omit it from serialization, for example, the doc attribute on the NestedField object. Default non-null values will be serialized. This is recommended by Pydantic: https://pydantic-docs.helpmanual.io/usage/model_config/#change-behaviour-globally """ model_config = ConfigDict(frozen=True) class Record(StructProtocol): __slots__ = ("_data",) _data: List[Any] @classmethod def _bind(cls, struct: StructType, **arguments: Any) -> Self: return cls(*[arguments[field.name] if field.name in arguments else field.initial_default for field in struct.fields]) def __init__(self, *data: Any) -> None: self._data = list(data) def __setitem__(self, pos: int, value: Any) -> None: """Assign a value to a Record.""" self._data[pos] = value def __getitem__(self, pos: int) -> Any: """Fetch a value from a Record.""" return self._data[pos] def __eq__(self, other: Any) -> bool: """Return the equality of two instances of the Record class.""" return self._data == other._data if isinstance(other, Record) else False def __repr__(self) -> str: """Return the string representation of the Record class.""" return f"{self.__class__.__name__}[{', '.join(str(v) for v in self._data)}]" def __len__(self) -> int: """Return the number of fields in the Record class.""" return len(self._data) def __hash__(self) -> int: """Return hash value of the Record class.""" return hash(str(self)) TableVersion: TypeAlias = Literal[1, 2, 3]