pyiceberg/avro/decoder_fast.pyx (95 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import cython from cython.cimports.cpython import array from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT from cpython.mem cimport PyMem_Malloc, PyMem_Realloc, PyMem_Free from libc.string cimport memcpy from libc.stdint cimport uint64_t, int64_t import array cdef extern from "decoder_basic.c": void decode_zigzag_ints(const unsigned char **buffer, const uint64_t count, uint64_t *result); void skip_zigzag_int(const unsigned char **buffer); unsigned_long_long_array_template = cython.declare(array.array, array.array('Q', [])) @cython.final cdef class CythonBinaryDecoder: """Implement a BinaryDecoder that reads from an in-memory buffer.""" # This the data that is duplicated when the decoder is created. cdef unsigned char *_data # This is the current pointer to the buffer. cdef const unsigned char *_current # This is the address after the data buffer cdef const unsigned char *_end # This is the size of the buffer of the data being parsed. cdef uint64_t _size def __cinit__(self, input_contents: bytes) -> None: self._size = len(input_contents) # Make a copy of the data so the data can be iterated. self._data = <unsigned char *> PyMem_Malloc(self._size * sizeof(char)) if not self._data: raise MemoryError() cdef const unsigned char *input_as_array = input_contents memcpy(self._data, input_as_array, self._size) self._end = self._data + self._size self._current = self._data def __dealloc__(self): PyMem_Free(self._data) cpdef unsigned int tell(self): """Return the current stream position.""" return self._current - self._data cpdef bytes read(self, n: int): """Read n bytes.""" if n < 0: raise ValueError(f"Requested {n} bytes to read, expected positive integer.") cdef const unsigned char *r = self._current self._current += n return r[0:n] def read_boolean(self) -> bool: """Reads a value from the stream as a boolean. A boolean is written as a single byte whose value is either 0 (false) or 1 (true). """ self._current += 1; return self._current[-1] != 0 cpdef inline int64_t read_int(self): """Reads a value from the stream as an integer. int/long values are written using variable-length, zigzag coding. """ cdef uint64_t result; if self._current >= self._end: raise EOFError(f"EOF: read 1 bytes") decode_zigzag_ints(&self._current, 1, &result) return result def read_ints(self, count: int) -> array.array[int]: """Reads a list of integers.""" newarray = array.clone(unsigned_long_long_array_template, count, zero=False) if self._current >= self._end: raise EOFError(f"EOF: read 1 bytes") decode_zigzag_ints(&self._current, count, <uint64_t *>newarray.data.as_ulonglongs) return newarray cpdef void read_int_bytes_dict(self, count: int, dest: Dict[int, bytes]): """Reads a dictionary of integers for keys and bytes for values into a destination dict.""" cdef uint64_t result[2]; if self._current >= self._end: raise EOFError(f"EOF: read 1 bytes") for _ in range(count): decode_zigzag_ints(&self._current, 2, <uint64_t *>&result) if result[1] <= 0: dest[result[0]] = b"" else: dest[result[0]] = self._current[0:result[1]] self._current += result[1] cpdef inline bytes read_bytes(self): """Bytes are encoded as a long followed by that many bytes of data.""" cdef uint64_t length; if self._current >= self._end: raise EOFError(f"EOF: read 1 bytes") decode_zigzag_ints(&self._current, 1, &length) if length <= 0: return b"" cdef const unsigned char *r = self._current self._current += length return r[0:length] cpdef float read_float(self): """Reads a value from the stream as a float. A float is written as 4 bytes. The float is converted into a 32-bit integer using a method equivalent to Java's floatToIntBits and then encoded in little-endian format. """ return float(STRUCT_FLOAT.unpack(self.read(4))[0]) cpdef float read_double(self): """Reads a value from the stream as a double. A double is written as 8 bytes. The double is converted into a 64-bit integer using a method equivalent to Java's doubleToLongBits and then encoded in little-endian format. """ return float(STRUCT_DOUBLE.unpack(self.read(8))[0]) cpdef str read_utf8(self): """Reads a utf-8 encoded string from the stream. A string is encoded as a long followed by that many bytes of UTF-8 encoded character data. """ return self.read_bytes().decode("utf-8") def skip_int(self) -> None: skip_zigzag_int(&self._current) return def skip(self, n: int) -> None: self._current += n def skip_boolean(self) -> None: self._current += 1 def skip_float(self) -> None: self._current += 4 def skip_double(self) -> None: self._current += 8 def skip_bytes(self) -> None: cdef uint64_t result; decode_zigzag_ints(&self._current, 1, &result) self._current += result def skip_utf8(self) -> None: self.skip_bytes()