pyiceberg/typedef.py (105 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from __future__ import annotations from abc import abstractmethod from datetime import date, datetime from decimal import Decimal from functools import lru_cache from typing import ( TYPE_CHECKING, Any, Callable, Dict, Generic, Literal, Optional, Protocol, Set, Tuple, TypeVar, Union, runtime_checkable, ) from uuid import UUID from pydantic import BaseModel, ConfigDict, RootModel from typing_extensions import TypeAlias if TYPE_CHECKING: from pyiceberg.types import StructType class FrozenDict(Dict[Any, Any]): def __setitem__(self, instance: Any, value: Any) -> None: """Assign a value to a FrozenDict.""" raise AttributeError("FrozenDict does not support assignment") def update(self, *args: Any, **kwargs: Any) -> None: raise AttributeError("FrozenDict does not support .update()") UTF8 = "utf-8" EMPTY_DICT = FrozenDict() K = TypeVar("K") V = TypeVar("V") # from https://stackoverflow.com/questions/2912231/is-there-a-clever-way-to-pass-the-key-to-defaultdicts-default-factory class KeyDefaultDict(Dict[K, V]): def __init__(self, default_factory: Callable[[K], V]): super().__init__() self.default_factory = default_factory def __missing__(self, key: K) -> V: """Define behavior if you access a non-existent key in a KeyDefaultDict.""" val = self.default_factory(key) self[key] = val return val Identifier = Tuple[str, ...] """A tuple of strings representing a table identifier. Each string in the tuple represents a part of the table's unique path. For example, a table in a namespace might be identified as: ("namespace", "table_name") Examples: >>> identifier: Identifier = ("namespace", "table_name") """ Properties = Dict[str, Any] """A dictionary type for properties in PyIceberg.""" RecursiveDict = Dict[str, Union[str, "RecursiveDict"]] """A recursive dictionary type for nested structures in PyIceberg.""" # Represents the literal value L = TypeVar("L", str, bool, int, float, bytes, UUID, Decimal, datetime, date, covariant=True) @runtime_checkable class StructProtocol(Protocol): # pragma: no cover """A generic protocol used by accessors to get and set at positions of an object.""" @abstractmethod def __getitem__(self, pos: int) -> Any: """Fetch a value from a StructProtocol.""" @abstractmethod def __setitem__(self, pos: int, value: Any) -> None: """Assign a value to a StructProtocol.""" class IcebergBaseModel(BaseModel): """ This class extends the Pydantic BaseModel to set default values by overriding them. This is because we always want to set by_alias to True. In Python, the dash can't be used in variable names, and this is used throughout the Iceberg spec. The same goes for exclude_none, if a field is None we want to omit it from serialization, for example, the doc attribute on the NestedField object. Default non-null values will be serialized. This is recommended by Pydantic: https://pydantic-docs.helpmanual.io/usage/model_config/#change-behaviour-globally """ model_config = ConfigDict(populate_by_name=True, frozen=True) def _exclude_private_properties(self, exclude: Optional[Set[str]] = None) -> Set[str]: # A small trick to exclude private properties. Properties are serialized by pydantic, # regardless if they start with an underscore. # This will look at the dict, and find the fields and exclude them return set.union( {field for field in self.__dict__ if field.startswith("_") and not field == "__root__"}, exclude or set() ) def model_dump( self, exclude_none: bool = True, exclude: Optional[Set[str]] = None, by_alias: bool = True, **kwargs: Any ) -> Dict[str, Any]: return super().model_dump( exclude_none=exclude_none, exclude=self._exclude_private_properties(exclude), by_alias=by_alias, **kwargs ) def model_dump_json( self, exclude_none: bool = True, exclude: Optional[Set[str]] = None, by_alias: bool = True, **kwargs: Any ) -> str: return super().model_dump_json( exclude_none=exclude_none, exclude=self._exclude_private_properties(exclude), by_alias=by_alias, **kwargs ) T = TypeVar("T") class IcebergRootModel(RootModel[T], Generic[T]): """ This class extends the Pydantic BaseModel to set default values by overriding them. This is because we always want to set by_alias to True. In Python, the dash can't be used in variable names, and this is used throughout the Iceberg spec. The same goes for exclude_none, if a field is None we want to omit it from serialization, for example, the doc attribute on the NestedField object. Default non-null values will be serialized. This is recommended by Pydantic: https://pydantic-docs.helpmanual.io/usage/model_config/#change-behaviour-globally """ model_config = ConfigDict(frozen=True) @lru_cache def _get_struct_fields(struct_type: StructType) -> Tuple[str, ...]: return tuple(field.name for field in struct_type.fields) class Record(StructProtocol): __slots__ = ("_position_to_field_name",) _position_to_field_name: Tuple[str, ...] def __init__(self, *data: Any, struct: Optional[StructType] = None, **named_data: Any) -> None: if struct is not None: self._position_to_field_name = _get_struct_fields(struct) elif named_data: # Order of named_data is preserved (PEP 468) so this can be used to generate the position dict self._position_to_field_name = tuple(named_data.keys()) else: self._position_to_field_name = tuple(f"field{idx + 1}" for idx in range(len(data))) for idx, d in enumerate(data): self[idx] = d for field_name, d in named_data.items(): self.__setattr__(field_name, d) def __setitem__(self, pos: int, value: Any) -> None: """Assign a value to a Record.""" self.__setattr__(self._position_to_field_name[pos], value) def __getitem__(self, pos: int) -> Any: """Fetch a value from a Record.""" return self.__getattribute__(self._position_to_field_name[pos]) def __eq__(self, other: Any) -> bool: """Return the equality of two instances of the Record class.""" if not isinstance(other, Record): return False return self.__dict__ == other.__dict__ def __repr__(self) -> str: """Return the string representation of the Record class.""" return f"{self.__class__.__name__}[{', '.join(f'{key}={repr(value)}' for key, value in self.__dict__.items() if not key.startswith('_'))}]" def __len__(self) -> int: """Return the number of fields in the Record class.""" return len(self._position_to_field_name) def __hash__(self) -> int: """Return hash value of the Record class.""" return hash(str(self)) TableVersion: TypeAlias = Literal[1, 2, 3]