pyiceberg/avro/decoder_fast.pyx (95 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import cython
from cython.cimports.cpython import array
from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT
from cpython.mem cimport PyMem_Malloc, PyMem_Realloc, PyMem_Free
from libc.string cimport memcpy
from libc.stdint cimport uint64_t, int64_t
import array
cdef extern from "decoder_basic.c":
void decode_zigzag_ints(const unsigned char **buffer, const uint64_t count, uint64_t *result);
void skip_zigzag_int(const unsigned char **buffer);
unsigned_long_long_array_template = cython.declare(array.array, array.array('Q', []))
@cython.final
cdef class CythonBinaryDecoder:
"""Implement a BinaryDecoder that reads from an in-memory buffer."""
# This the data that is duplicated when the decoder is created.
cdef unsigned char *_data
# This is the current pointer to the buffer.
cdef const unsigned char *_current
# This is the address after the data buffer
cdef const unsigned char *_end
# This is the size of the buffer of the data being parsed.
cdef uint64_t _size
def __cinit__(self, input_contents: bytes) -> None:
self._size = len(input_contents)
# Make a copy of the data so the data can be iterated.
self._data = <unsigned char *> PyMem_Malloc(self._size * sizeof(char))
if not self._data:
raise MemoryError()
cdef const unsigned char *input_as_array = input_contents
memcpy(self._data, input_as_array, self._size)
self._end = self._data + self._size
self._current = self._data
def __dealloc__(self):
PyMem_Free(self._data)
cpdef unsigned int tell(self):
"""Return the current stream position."""
return self._current - self._data
cpdef bytes read(self, n: int):
"""Read n bytes."""
if n < 0:
raise ValueError(f"Requested {n} bytes to read, expected positive integer.")
cdef const unsigned char *r = self._current
self._current += n
return r[0:n]
def read_boolean(self) -> bool:
"""Reads a value from the stream as a boolean.
A boolean is written as a single byte
whose value is either 0 (false) or 1 (true).
"""
self._current += 1;
return self._current[-1] != 0
cpdef inline int64_t read_int(self):
"""Reads a value from the stream as an integer.
int/long values are written using variable-length, zigzag coding.
"""
cdef uint64_t result;
if self._current >= self._end:
raise EOFError(f"EOF: read 1 bytes")
decode_zigzag_ints(&self._current, 1, &result)
return result
def read_ints(self, count: int) -> array.array[int]:
"""Reads a list of integers."""
newarray = array.clone(unsigned_long_long_array_template, count, zero=False)
if self._current >= self._end:
raise EOFError(f"EOF: read 1 bytes")
decode_zigzag_ints(&self._current, count, <uint64_t *>newarray.data.as_ulonglongs)
return newarray
cpdef void read_int_bytes_dict(self, count: int, dest: Dict[int, bytes]):
"""Reads a dictionary of integers for keys and bytes for values into a destination dict."""
cdef uint64_t result[2];
if self._current >= self._end:
raise EOFError(f"EOF: read 1 bytes")
for _ in range(count):
decode_zigzag_ints(&self._current, 2, <uint64_t *>&result)
if result[1] <= 0:
dest[result[0]] = b""
else:
dest[result[0]] = self._current[0:result[1]]
self._current += result[1]
cpdef inline bytes read_bytes(self):
"""Bytes are encoded as a long followed by that many bytes of data."""
cdef uint64_t length;
if self._current >= self._end:
raise EOFError(f"EOF: read 1 bytes")
decode_zigzag_ints(&self._current, 1, &length)
if length <= 0:
return b""
cdef const unsigned char *r = self._current
self._current += length
return r[0:length]
cpdef float read_float(self):
"""Reads a value from the stream as a float.
A float is written as 4 bytes.
The float is converted into a 32-bit integer using a method equivalent to
Java's floatToIntBits and then encoded in little-endian format.
"""
return float(STRUCT_FLOAT.unpack(self.read(4))[0])
cpdef float read_double(self):
"""Reads a value from the stream as a double.
A double is written as 8 bytes.
The double is converted into a 64-bit integer using a method equivalent to
Java's doubleToLongBits and then encoded in little-endian format.
"""
return float(STRUCT_DOUBLE.unpack(self.read(8))[0])
cpdef str read_utf8(self):
"""Reads a utf-8 encoded string from the stream.
A string is encoded as a long followed by
that many bytes of UTF-8 encoded character data.
"""
return self.read_bytes().decode("utf-8")
def skip_int(self) -> None:
skip_zigzag_int(&self._current)
return
def skip(self, n: int) -> None:
self._current += n
def skip_boolean(self) -> None:
self._current += 1
def skip_float(self) -> None:
self._current += 4
def skip_double(self) -> None:
self._current += 8
def skip_bytes(self) -> None:
cdef uint64_t result;
decode_zigzag_ints(&self._current, 1, &result)
self._current += result
def skip_utf8(self) -> None:
self.skip_bytes()