pyiceberg/avro/encoder.py (33 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Any
from uuid import UUID
from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT
from pyiceberg.io import OutputStream
from pyiceberg.typedef import UTF8
class BinaryEncoder:
"""Encodes Python physical types into bytes."""
_output_stream: OutputStream
def __init__(self, output_stream: OutputStream) -> None:
self._output_stream = output_stream
def write(self, b: bytes) -> None:
self._output_stream.write(b)
def write_boolean(self, boolean: bool) -> None:
"""Write a boolean as a single byte whose value is either 0 (false) or 1 (true).
Args:
boolean: The boolean to write.
"""
self.write(bytearray([bool(boolean)]))
def write_int(self, integer: int) -> None:
"""Integer and long values are written using variable-length zig-zag coding."""
datum = (integer << 1) ^ (integer >> 63)
while (datum & ~0x7F) != 0:
self.write(bytearray([(datum & 0x7F) | 0x80]))
datum >>= 7
self.write(bytearray([datum]))
def write_float(self, f: float) -> None:
"""Write a float as 4 bytes."""
self.write(STRUCT_FLOAT.pack(f))
def write_double(self, f: float) -> None:
"""Write a double as 8 bytes."""
self.write(STRUCT_DOUBLE.pack(f))
def write_bytes(self, b: bytes) -> None:
"""Bytes are encoded as a long followed by that many bytes of data."""
self.write_int(len(b))
self.write(b)
def write_utf8(self, s: str) -> None:
"""Encode a string as a long followed by that many bytes of UTF-8 encoded character data."""
self.write_bytes(s.encode(UTF8))
def write_uuid(self, uuid: UUID) -> None:
"""Write UUID as a fixed[16].
The uuid logical type represents a random generated universally unique identifier (UUID).
An uuid logical type annotates an Avro string. The string has to conform with RFC-4122.
"""
if len(uuid.bytes) != 16:
raise ValueError(f"Expected UUID to have 16 bytes, got: len({uuid.bytes!r})")
return self.write(uuid.bytes)
def write_unknown(self, _: Any) -> None:
"""Nulls are written as 0 bytes in avro, so we do nothing."""