pyiceberg/serializers.py (54 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import codecs
import gzip
from abc import ABC, abstractmethod
from typing import Callable
from pyiceberg.io import InputFile, InputStream, OutputFile
from pyiceberg.table.metadata import TableMetadata, TableMetadataUtil
from pyiceberg.typedef import UTF8
from pyiceberg.utils.config import Config
GZIP = "gzip"
class Compressor(ABC):
@staticmethod
def get_compressor(location: str) -> Compressor:
return GzipCompressor() if location.endswith(".gz.metadata.json") else NOOP_COMPRESSOR
@abstractmethod
def stream_decompressor(self, inp: InputStream) -> InputStream:
"""Return a stream decompressor.
Args:
inp: The input stream that needs decompressing.
Returns:
The wrapped stream
"""
@abstractmethod
def bytes_compressor(self) -> Callable[[bytes], bytes]:
"""Return a function to compress bytes.
Returns:
A function that can be used to compress bytes.
"""
class NoopCompressor(Compressor):
def stream_decompressor(self, inp: InputStream) -> InputStream:
return inp
def bytes_compressor(self) -> Callable[[bytes], bytes]:
return lambda b: b
NOOP_COMPRESSOR = NoopCompressor()
class GzipCompressor(Compressor):
def stream_decompressor(self, inp: InputStream) -> InputStream:
return gzip.open(inp)
def bytes_compressor(self) -> Callable[[bytes], bytes]:
return gzip.compress
class FromByteStream:
"""A collection of methods that deserialize dictionaries into Iceberg objects."""
@staticmethod
def table_metadata(
byte_stream: InputStream, encoding: str = UTF8, compression: Compressor = NOOP_COMPRESSOR
) -> TableMetadata:
"""Instantiate a TableMetadata object from a byte stream.
Args:
byte_stream: A file-like byte stream object.
encoding (default "utf-8"): The byte encoder to use for the reader.
compression: Optional compression method
"""
with compression.stream_decompressor(byte_stream) as byte_stream:
reader = codecs.getreader(encoding)
json_bytes = reader(byte_stream)
metadata = json_bytes.read()
return TableMetadataUtil.parse_raw(metadata)
class FromInputFile:
"""A collection of methods that deserialize InputFiles into Iceberg objects."""
@staticmethod
def table_metadata(input_file: InputFile, encoding: str = UTF8) -> TableMetadata:
"""Create a TableMetadata instance from an input file.
Args:
input_file (InputFile): A custom implementation of the iceberg.io.file.InputFile abstract base class.
encoding (str): Encoding to use when loading bytestream.
Returns:
TableMetadata: A table metadata instance.
"""
with input_file.open() as input_stream:
return FromByteStream.table_metadata(
byte_stream=input_stream, encoding=encoding, compression=Compressor.get_compressor(location=input_file.location)
)
class ToOutputFile:
"""A collection of methods that serialize Iceberg objects into files given an OutputFile instance."""
@staticmethod
def table_metadata(metadata: TableMetadata, output_file: OutputFile, overwrite: bool = False) -> None:
"""Write a TableMetadata instance to an output file.
Args:
output_file (OutputFile): A custom implementation of the iceberg.io.file.OutputFile abstract base class.
overwrite (bool): Where to overwrite the file if it already exists. Defaults to `False`.
"""
with output_file.create(overwrite=overwrite) as output_stream:
# We need to serialize None values, in order to dump `None` current-snapshot-id as `-1`
exclude_none = False if Config().get_bool("legacy-current-snapshot-id") else True
json_bytes = metadata.model_dump_json(exclude_none=exclude_none).encode(UTF8)
json_bytes = Compressor.get_compressor(output_file.location).bytes_compressor()(json_bytes)
output_stream.write(json_bytes)