pyiceberg/avro/reader.py (277 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """ Classes for building the Reader tree. Constructing a reader tree from the schema makes it easy to decouple the reader implementation from the schema. The reader tree can be changed in such a way that the read schema is different, while respecting the read schema. """ from __future__ import annotations from abc import abstractmethod from dataclasses import dataclass from dataclasses import field as dataclassfield from decimal import Decimal from typing import ( Any, Callable, List, Mapping, Optional, Tuple, ) from uuid import UUID from pyiceberg.avro.decoder import BinaryDecoder from pyiceberg.typedef import StructProtocol from pyiceberg.types import StructType from pyiceberg.utils.decimal import bytes_to_decimal, decimal_required_bytes from pyiceberg.utils.lazydict import LazyDict from pyiceberg.utils.singleton import Singleton def _skip_map_array(decoder: BinaryDecoder, skip_entry: Callable[[], None]) -> None: """Skips over an array or map. Both the array and map are encoded similar, and we can reuse the logic of skipping in an efficient way. From the Avro spec: Maps (and arrays) are encoded as a series of blocks. Each block consists of a long count value, followed by that many key/value pairs in the case of a map, and followed by that many array items in the case of an array. A block with count zero indicates the end of the map. Each item is encoded per the map's value schema. If a block's count is negative, its absolute value is used, and the count is followed immediately by a long block size indicating the number of bytes in the block. This block size permits fast skipping through data, e.g., when projecting a record to a subset of its fields. Args: decoder: The decoder that reads the types from the underlying data. skip_entry: Function to skip over the underlying data, element in case of an array, and the key/value in the case of a map. """ block_count = decoder.read_int() while block_count != 0: if block_count < 0: # The length in bytes in encoded, so we can skip over it right away block_size = decoder.read_int() decoder.skip(block_size) else: for _ in range(block_count): skip_entry() block_count = decoder.read_int() class Reader(Singleton): @abstractmethod def read(self, decoder: BinaryDecoder) -> Any: ... @abstractmethod def skip(self, decoder: BinaryDecoder) -> None: ... def __repr__(self) -> str: """Return the string representation of the Reader class.""" return f"{self.__class__.__name__}()" class NoneReader(Reader): def read(self, _: BinaryDecoder) -> None: return None def skip(self, decoder: BinaryDecoder) -> None: return None class DefaultReader(Reader): __slots__ = ("default_value",) default_value: Any def __init__(self, default_value: Any) -> None: self.default_value = default_value def read(self, _: BinaryDecoder) -> Any: return self.default_value def skip(self, decoder: BinaryDecoder) -> None: pass class BooleanReader(Reader): def read(self, decoder: BinaryDecoder) -> bool: return decoder.read_boolean() def skip(self, decoder: BinaryDecoder) -> None: decoder.skip_boolean() class IntegerReader(Reader): """Longs and ints are encoded the same way, and there is no long in Python.""" def read(self, decoder: BinaryDecoder) -> int: return decoder.read_int() def skip(self, decoder: BinaryDecoder) -> None: decoder.skip_int() class FloatReader(Reader): def read(self, decoder: BinaryDecoder) -> float: return decoder.read_float() def skip(self, decoder: BinaryDecoder) -> None: decoder.skip_float() class DoubleReader(Reader): def read(self, decoder: BinaryDecoder) -> float: return decoder.read_double() def skip(self, decoder: BinaryDecoder) -> None: decoder.skip_double() class DateReader(IntegerReader): """Reads a day granularity date from the stream. The number of days from 1 January 1970. """ class TimeReader(IntegerReader): """Reads a microsecond granularity timestamp from the stream. Long is decoded as an integer which represents the number of microseconds from the unix epoch, 1 January 1970. """ class TimestampReader(IntegerReader): """Reads a microsecond granularity timestamp from the stream. Long is decoded as python integer which represents the number of microseconds from the unix epoch, 1 January 1970. """ class TimestampNanoReader(IntegerReader): """Reads a nanosecond granularity timestamp from the stream. Long is decoded as python integer which represents the number of nanoseconds from the unix epoch, 1 January 1970. """ class TimestamptzReader(IntegerReader): """Reads a microsecond granularity timestamptz from the stream. Long is decoded as python integer which represents the number of microseconds from the unix epoch, 1 January 1970. Adjusted to UTC. """ class TimestamptzNanoReader(IntegerReader): """Reads a microsecond granularity timestamptz from the stream. Long is decoded as python integer which represents the number of nanoseconds from the unix epoch, 1 January 1970. Adjusted to UTC. """ class StringReader(Reader): def read(self, decoder: BinaryDecoder) -> str: return decoder.read_utf8() def skip(self, decoder: BinaryDecoder) -> None: decoder.skip_utf8() class UUIDReader(Reader): def read(self, decoder: BinaryDecoder) -> UUID: return UUID(bytes=decoder.read(16)) def skip(self, decoder: BinaryDecoder) -> None: decoder.skip(16) class UnknownReader(Reader): def read(self, decoder: BinaryDecoder) -> None: return None def skip(self, decoder: BinaryDecoder) -> None: pass @dataclass(frozen=True) class FixedReader(Reader): _len: int = dataclassfield() def read(self, decoder: BinaryDecoder) -> bytes: return decoder.read(len(self)) def skip(self, decoder: BinaryDecoder) -> None: decoder.skip(len(self)) def __len__(self) -> int: """Return the length of an instance of the FixedReader class.""" return self._len def __repr__(self) -> str: """Return the string representation of the FixedReader class.""" return f"FixedReader({self._len})" class BinaryReader(Reader): """Read a binary value. First reads an integer, to get the length of the binary value, then reads the binary field itself. """ def read(self, decoder: BinaryDecoder) -> bytes: return decoder.read_bytes() def skip(self, decoder: BinaryDecoder) -> None: decoder.skip_bytes() @dataclass(frozen=True, init=False) class DecimalReader(Reader): """Reads a value as a decimal. Decimal bytes are decoded as signed short, int or long depending on the size of bytes. """ precision: int = dataclassfield() scale: int = dataclassfield() _length: int def __init__(self, precision: int, scale: int): object.__setattr__(self, "precision", precision) object.__setattr__(self, "scale", scale) object.__setattr__(self, "_length", decimal_required_bytes(precision)) def read(self, decoder: BinaryDecoder) -> Decimal: return bytes_to_decimal(decoder.read(self._length), self.scale) def skip(self, decoder: BinaryDecoder) -> None: decoder.skip_bytes() def __repr__(self) -> str: """Return the string representation of the DecimalReader class.""" return f"DecimalReader({self.precision}, {self.scale})" @dataclass(frozen=True) class OptionReader(Reader): option: Reader = dataclassfield() def read(self, decoder: BinaryDecoder) -> Optional[Any]: # For the Iceberg spec it is required to set the default value to null # From https://iceberg.apache.org/spec/#avro # Optional fields must always set the Avro field default value to null. # # This means that null has to come first: # https://avro.apache.org/docs/current/spec.html # type of the default value must match the first element of the union. # This is enforced in the schema conversion, which happens prior # to building the reader tree if decoder.read_int() > 0: return self.option.read(decoder) return None def skip(self, decoder: BinaryDecoder) -> None: if decoder.read_int() > 0: return self.option.skip(decoder) class StructReader(Reader): __slots__ = ( "field_readers", "create_struct", "struct", "_field_reader_functions", "_hash", "_max_pos", ) field_readers: Tuple[Tuple[Optional[int], Reader], ...] create_struct: Callable[..., StructProtocol] struct: StructType field_reader_functions = Tuple[Tuple[Optional[str], int, Optional[Callable[[BinaryDecoder], Any]]], ...] def __init__( self, field_readers: Tuple[Tuple[Optional[int], Reader], ...], create_struct: Callable[..., StructProtocol], struct: StructType, ) -> None: self.field_readers = field_readers self.create_struct = create_struct # TODO: Implement struct-reuse self.struct = struct if not isinstance(self.create_struct(), StructProtocol): raise ValueError(f"Incompatible with StructProtocol: {self.create_struct}") reading_callbacks: List[Tuple[Optional[int], Callable[[BinaryDecoder], Any]]] = [] max_pos = -1 for pos, field in field_readers: if pos is not None: reading_callbacks.append((pos, field.read)) max_pos = max(max_pos, pos) else: reading_callbacks.append((None, field.skip)) self._field_reader_functions = tuple(reading_callbacks) self._hash = hash(self._field_reader_functions) self._max_pos = 1 + max_pos def read(self, decoder: BinaryDecoder) -> StructProtocol: # TODO: Implement struct-reuse struct = self.create_struct(*[None] * self._max_pos) for pos, field_reader in self._field_reader_functions: if pos is not None: struct[pos] = field_reader(decoder) # later: pass reuse in here else: field_reader(decoder) return struct def skip(self, decoder: BinaryDecoder) -> None: for _, field in self.field_readers: field.skip(decoder) def __eq__(self, other: Any) -> bool: """Return the equality of two instances of the StructReader class.""" return ( self.field_readers == other.field_readers and self.create_struct == other.create_struct if isinstance(other, StructReader) else False ) def __repr__(self) -> str: """Return the string representation of the StructReader class.""" return f"StructReader(({','.join(repr(field) for field in self.field_readers)}), {repr(self.create_struct)})" def __hash__(self) -> int: """Return a hashed representation of the StructReader class.""" return self._hash @dataclass(frozen=False, init=False) class ListReader(Reader): __slots__ = ("element", "_is_int_list", "_hash") element: Reader def __init__(self, element: Reader) -> None: super().__init__() self.element = element self._hash = hash(self.element) self._is_int_list = isinstance(self.element, IntegerReader) def read(self, decoder: BinaryDecoder) -> List[Any]: read_items: List[Any] = [] block_count = decoder.read_int() while block_count != 0: if block_count < 0: block_count = -block_count _ = decoder.read_int() if self._is_int_list: read_items.extend(decoder.read_ints(block_count)) else: for _ in range(block_count): read_items.append(self.element.read(decoder)) block_count = decoder.read_int() return read_items def skip(self, decoder: BinaryDecoder) -> None: _skip_map_array(decoder, lambda: self.element.skip(decoder)) def __hash__(self) -> int: """Return a hashed representation of the ListReader class.""" return self._hash # Represent an empty dict as a singleton EMPTY_DICT: dict[Any, Any] = {} @dataclass(frozen=False, init=False) class MapReader(Reader): __slots__ = ("key", "value", "_is_int_int", "_is_int_bytes", "_key_reader", "_value_reader", "_hash") key: Reader value: Reader def __init__(self, key: Reader, value: Reader) -> None: super().__init__() self.key = key self.value = value if isinstance(self.key, IntegerReader): self._is_int_int = isinstance(self.value, IntegerReader) self._is_int_bytes = isinstance(self.value, BinaryReader) else: self._is_int_int = False self._is_int_bytes = False self._key_reader = self.key.read self._value_reader = self.value.read self._hash = hash((self.key, self.value)) def _read_int_int(self, decoder: BinaryDecoder) -> Mapping[int, int]: """Read a mapping from int to int from the decoder. Read a map of ints to ints from the decoder, since this is such a common data type, it is optimized to be faster than the generic map reader, by using a lazy dict. The time it takes to create the python dictionary is much larger than the time it takes to read the data from the decoder as an array, so the lazy dict defers creating the python dictionary until it is actually accessed. """ block_count = decoder.read_int() # Often times the map is empty, so we can just return an empty dict without # instancing the LazyDict if block_count == 0: return EMPTY_DICT contents_array: List[Tuple[int, ...]] = [] while block_count != 0: if block_count < 0: block_count = -block_count # We ignore the block size for now decoder.skip_int() # Since the integers are encoding right next to each other # just read them all at once. contents_array.append(decoder.read_ints(block_count * 2)) block_count = decoder.read_int() return LazyDict(contents_array) def read(self, decoder: BinaryDecoder) -> Mapping[Any, Any]: read_items: dict[Any, Any] = {} if self._is_int_int or self._is_int_bytes: if self._is_int_int: return self._read_int_int(decoder) block_count = decoder.read_int() while block_count != 0: if block_count < 0: block_count = -block_count # We ignore the block size for now _ = decoder.read_int() decoder.read_int_bytes_dict(block_count, read_items) block_count = decoder.read_int() else: block_count = decoder.read_int() while block_count != 0: if block_count < 0: block_count = -block_count # We ignore the block size for now _ = decoder.read_int() for _ in range(block_count): key = self._key_reader(decoder) read_items[key] = self._value_reader(decoder) block_count = decoder.read_int() return read_items def skip(self, decoder: BinaryDecoder) -> None: def skip() -> None: self.key.skip(decoder) self.value.skip(decoder) _skip_map_array(decoder, skip) def __hash__(self) -> int: """Return a hashed representation of the MapReader class.""" return self._hash