python/pyfury/_serialization.pyx (1,870 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# distutils: language = c++
# cython: embedsignature = True
# cython: language_level = 3
# cython: annotate = True
import datetime
import logging
import os
import platform
import time
import warnings
from typing import TypeVar, Union, Iterable
from pyfury._util import get_bit, set_bit, clear_bit
from pyfury import _fury as fmod
from pyfury._fury import Language
from pyfury._fury import _PicklerStub, _UnpicklerStub, Pickler, Unpickler
from pyfury._fury import _ENABLE_CLASS_REGISTRATION_FORCIBLY
from pyfury.lib import mmh3
from pyfury.meta.metastring import Encoding
from pyfury.type import is_primitive_type
from pyfury.util import is_little_endian
from pyfury.includes.libserialization cimport \
(TypeId, IsNamespacedType, Fury_PyBooleanSequenceWriteToBuffer, Fury_PyFloatSequenceWriteToBuffer)
from libc.stdint cimport int8_t, int16_t, int32_t, int64_t, uint64_t
from libc.stdint cimport *
from libcpp.vector cimport vector
from cpython cimport PyObject
from cpython.dict cimport PyDict_Next
from cpython.ref cimport *
from cpython.list cimport PyList_New, PyList_SET_ITEM
from cpython.tuple cimport PyTuple_New, PyTuple_SET_ITEM
from libcpp cimport bool as c_bool
from libcpp.utility cimport pair
from cython.operator cimport dereference as deref
from pyfury._util cimport Buffer
from pyfury.includes.libabsl cimport flat_hash_map
try:
import numpy as np
except ImportError:
np = None
cimport cython
logger = logging.getLogger(__name__)
ENABLE_FURY_CYTHON_SERIALIZATION = os.environ.get(
"ENABLE_FURY_CYTHON_SERIALIZATION", "True").lower() in ("true", "1")
cdef extern from *:
"""
#define int2obj(obj_addr) ((PyObject *)(obj_addr))
#define obj2int(obj_ref) (Py_INCREF(obj_ref), ((int64_t)(obj_ref)))
"""
object int2obj(int64_t obj_addr)
int64_t obj2int(object obj_ref)
dict _PyDict_NewPresized(Py_ssize_t minused)
Py_ssize_t Py_SIZE(object obj)
cdef int8_t NULL_FLAG = -3
# This flag indicates that object is a not-null value.
# We don't use another byte to indicate REF, so that we can save one byte.
cdef int8_t REF_FLAG = -2
# this flag indicates that the object is a non-null value.
cdef int8_t NOT_NULL_VALUE_FLAG = -1
# this flag indicates that the object is a referencable and first read.
cdef int8_t REF_VALUE_FLAG = 0
@cython.final
cdef class MapRefResolver:
cdef flat_hash_map[uint64_t, int32_t] written_objects_id # id(obj) -> ref_id
# Hold object to avoid tmp object gc when serialize nested fields/objects.
cdef vector[PyObject *] written_objects
cdef vector[PyObject *] read_objects
cdef vector[int32_t] read_ref_ids
cdef object read_object
cdef c_bool ref_tracking
def __cinit__(self, c_bool ref_tracking):
self.read_object = None
self.ref_tracking = ref_tracking
# Special methods of extension types must be declared with def, not cdef.
def __dealloc__(self):
self.reset()
cpdef inline c_bool write_ref_or_null(self, Buffer buffer, obj):
if not self.ref_tracking:
if obj is None:
buffer.write_int8(NULL_FLAG)
return True
else:
buffer.write_int8(NOT_NULL_VALUE_FLAG)
return False
if obj is None:
buffer.write_int8(NULL_FLAG)
return True
cdef uint64_t object_id = <uintptr_t> <PyObject *> obj
cdef int32_t next_id
cdef flat_hash_map[uint64_t, int32_t].iterator it = \
self.written_objects_id.find(object_id)
if it == self.written_objects_id.end():
next_id = self.written_objects_id.size()
self.written_objects_id[object_id] = next_id
self.written_objects.push_back(<PyObject *> obj)
Py_INCREF(obj)
buffer.write_int8(REF_VALUE_FLAG)
return False
else:
# The obj has been written previously.
buffer.write_int8(REF_FLAG)
buffer.write_varuint32(<uint64_t> deref(it).second)
return True
cpdef inline int8_t read_ref_or_null(self, Buffer buffer):
cdef int8_t head_flag = buffer.read_int8()
if not self.ref_tracking:
return head_flag
cdef int32_t ref_id
if head_flag == REF_FLAG:
# read reference id and get object from reference resolver
ref_id = buffer.read_varuint32()
self.read_object = <object> (self.read_objects[ref_id])
return REF_FLAG
else:
self.read_object = None
return head_flag
cpdef inline int32_t preserve_ref_id(self):
if not self.ref_tracking:
return -1
next_read_ref_id = self.read_objects.size()
self.read_objects.push_back(NULL)
self.read_ref_ids.push_back(next_read_ref_id)
return next_read_ref_id
cpdef inline int32_t try_preserve_ref_id(self, Buffer buffer):
if not self.ref_tracking:
# `NOT_NULL_VALUE_FLAG` can be used as stub reference id because we use
# `refId >= NOT_NULL_VALUE_FLAG` to read data.
return buffer.read_int8()
head_flag = buffer.read_int8()
if head_flag == REF_FLAG:
# read reference id and get object from reference resolver
ref_id = buffer.read_varuint32()
self.read_object = <object> (self.read_objects[ref_id])
# `head_flag` except `REF_FLAG` can be used as stub reference id because
# we use `refId >= NOT_NULL_VALUE_FLAG` to read data.
return head_flag
else:
self.read_object = None
if head_flag == REF_VALUE_FLAG:
return self.preserve_ref_id()
return head_flag
cpdef inline reference(self, obj):
if not self.ref_tracking:
return
cdef int32_t ref_id = self.read_ref_ids.back()
self.read_ref_ids.pop_back()
cdef c_bool need_inc = self.read_objects[ref_id] == NULL
if need_inc:
Py_INCREF(obj)
self.read_objects[ref_id] = <PyObject *> obj
cpdef inline get_read_object(self, id_=None):
if not self.ref_tracking:
return None
if id_ is None:
return self.read_object
cdef int32_t ref_id = id_
return <object> (self.read_objects[ref_id])
cpdef inline set_read_object(self, int32_t ref_id, obj):
if not self.ref_tracking:
return
if ref_id >= 0:
need_inc = self.read_objects[ref_id] == NULL
if need_inc:
Py_INCREF(obj)
self.read_objects[ref_id] = <PyObject *> obj
cpdef inline reset(self):
self.reset_write()
self.reset_read()
cpdef inline reset_write(self):
self.written_objects_id.clear()
for item in self.written_objects:
Py_XDECREF(item)
self.written_objects.clear()
cpdef inline reset_read(self):
if not self.ref_tracking:
return
for item in self.read_objects:
Py_XDECREF(item)
self.read_objects.clear()
self.read_ref_ids.clear()
self.read_object = None
cdef int8_t USE_CLASSNAME = 0
cdef int8_t USE_CLASS_ID = 1
# preserve 0 as flag for class id not set in ClassInfo`
cdef int8_t NO_CLASS_ID = 0
cdef int8_t DEFAULT_DYNAMIC_WRITE_META_STR_ID = fmod.DEFAULT_DYNAMIC_WRITE_META_STR_ID
cdef int8_t INT64_CLASS_ID = fmod.INT64_CLASS_ID
cdef int8_t FLOAT64_CLASS_ID = fmod.FLOAT64_CLASS_ID
cdef int8_t BOOL_CLASS_ID = fmod.BOOL_CLASS_ID
cdef int8_t STRING_CLASS_ID = fmod.STRING_CLASS_ID
cdef int16_t MAGIC_NUMBER = fmod.MAGIC_NUMBER
cdef int32_t NOT_NULL_INT64_FLAG = fmod.NOT_NULL_INT64_FLAG
cdef int32_t NOT_NULL_FLOAT64_FLAG = fmod.NOT_NULL_FLOAT64_FLAG
cdef int32_t NOT_NULL_BOOL_FLAG = fmod.NOT_NULL_BOOL_FLAG
cdef int32_t NOT_NULL_STRING_FLAG = fmod.NOT_NULL_STRING_FLAG
cdef int32_t SMALL_STRING_THRESHOLD = fmod.SMALL_STRING_THRESHOLD
@cython.final
cdef class MetaStringBytes:
cdef public bytes data
cdef int16_t length
cdef public int8_t encoding
cdef public int64_t hashcode
cdef public int16_t dynamic_write_string_id
def __init__(self, data, hashcode):
self.data = data
self.length = len(data)
self.hashcode = hashcode
self.encoding = hashcode & 0xff
self.dynamic_write_string_id = DEFAULT_DYNAMIC_WRITE_META_STR_ID
def __eq__(self, other):
return type(other) is MetaStringBytes and other.hashcode == self.hashcode
def __hash__(self):
return self.hashcode
def decode(self, decoder):
return decoder.decode(self.data, Encoding(self.encoding))
def __repr__(self):
return f"MetaStringBytes(data={self.data}, hashcode={self.hashcode})"
@cython.final
cdef class MetaStringResolver:
cdef:
int16_t dynamic_write_string_id
vector[PyObject *] _c_dynamic_written_enum_string
vector[PyObject *] _c_dynamic_id_to_enum_string_vec
# hash -> MetaStringBytes
flat_hash_map[int64_t, PyObject *] _c_hash_to_metastr_bytes
flat_hash_map[pair[int64_t, int64_t], PyObject *] _c_hash_to_small_metastring_bytes
set _enum_str_set
dict _metastr_to_metastr_bytes
def __init__(self):
self._enum_str_set = set()
self._metastr_to_metastr_bytes = dict()
cpdef inline write_meta_string_bytes(
self, Buffer buffer, MetaStringBytes metastr_bytes):
cdef int16_t dynamic_type_id = metastr_bytes.dynamic_write_string_id
cdef int32_t length = metastr_bytes.length
if dynamic_type_id == DEFAULT_DYNAMIC_WRITE_META_STR_ID:
dynamic_type_id = self.dynamic_write_string_id
metastr_bytes.dynamic_write_string_id = dynamic_type_id
self.dynamic_write_string_id += 1
self._c_dynamic_written_enum_string.push_back(<PyObject *> metastr_bytes)
buffer.write_varuint32(length << 1)
if length <= SMALL_STRING_THRESHOLD:
buffer.write_int8(metastr_bytes.encoding)
else:
buffer.write_int64(metastr_bytes.hashcode)
buffer.write_bytes(metastr_bytes.data)
else:
buffer.write_varuint32(((dynamic_type_id + 1) << 1) | 1)
cpdef inline MetaStringBytes read_meta_string_bytes(self, Buffer buffer):
cdef int32_t header = buffer.read_varuint32()
cdef int32_t length = header >> 1
if header & 0b1 != 0:
return <MetaStringBytes> self._c_dynamic_id_to_enum_string_vec[length - 1]
cdef int64_t v1 = 0, v2 = 0, hashcode
cdef PyObject * enum_str_ptr
cdef int32_t reader_index
cdef encoding = 0
if length <= SMALL_STRING_THRESHOLD:
encoding = buffer.read_int8()
if length <= 8:
v1 = buffer.read_bytes_as_int64(length)
else:
v1 = buffer.read_int64()
v2 = buffer.read_bytes_as_int64(length - 8)
hashcode = ((v1 * 31 + v2) >> 8 << 8) | encoding
enum_str_ptr = self._c_hash_to_small_metastring_bytes[pair[int64_t, int64_t](v1, v2)]
if enum_str_ptr == NULL:
reader_index = buffer.reader_index
str_bytes = buffer.get_bytes(reader_index - length, length)
enum_str = MetaStringBytes(str_bytes, hashcode=hashcode)
self._enum_str_set.add(enum_str)
enum_str_ptr = <PyObject *> enum_str
self._c_hash_to_small_metastring_bytes[pair[int64_t, int64_t](v1, v2)] = enum_str_ptr
else:
hashcode = buffer.read_int64()
reader_index = buffer.reader_index
buffer.check_bound(reader_index, length)
buffer.reader_index = reader_index + length
enum_str_ptr = self._c_hash_to_metastr_bytes[hashcode]
if enum_str_ptr == NULL:
str_bytes = buffer.get_bytes(reader_index, length)
enum_str = MetaStringBytes(str_bytes, hashcode=hashcode)
self._enum_str_set.add(enum_str)
enum_str_ptr = <PyObject *> enum_str
self._c_hash_to_metastr_bytes[hashcode] = enum_str_ptr
self._c_dynamic_id_to_enum_string_vec.push_back(enum_str_ptr)
return <MetaStringBytes> enum_str_ptr
def get_metastr_bytes(self, metastr):
metastr_bytes = self._metastr_to_metastr_bytes.get(metastr)
if metastr_bytes is not None:
return metastr_bytes
cdef int64_t v1 = 0, v2 = 0, hashcode
length = len(metastr.encoded_data)
if length <= SMALL_STRING_THRESHOLD:
data_buf = Buffer(metastr.encoded_data)
if length <= 8:
v1 = data_buf.read_bytes_as_int64(length)
else:
v1 = data_buf.read_int64()
v2 = data_buf.read_bytes_as_int64(length - 8)
value_hash = ((v1 * 31 + v2) >> 8 << 8) | metastr.encoding.value
else:
value_hash = mmh3.hash_buffer(metastr.encoded_data, seed=47)[0]
value_hash = value_hash >> 8 << 8
value_hash |= metastr.encoding.value & 0xFF
self._metastr_to_metastr_bytes[metastr] = metastr_bytes = MetaStringBytes(metastr.encoded_data, value_hash)
return metastr_bytes
cpdef inline reset_read(self):
self._c_dynamic_id_to_enum_string_vec.clear()
cpdef inline reset_write(self):
if self.dynamic_write_string_id != 0:
self.dynamic_write_string_id = 0
for ptr in self._c_dynamic_written_enum_string:
(<MetaStringBytes> ptr).dynamic_write_string_id = \
DEFAULT_DYNAMIC_WRITE_META_STR_ID
self._c_dynamic_written_enum_string.clear()
@cython.final
cdef class ClassInfo:
"""
If dynamic_type is true, the serializer will be a dynamic typed serializer
and it will write type info when writing the data.
In such cases, the `write_typeinfo` should not write typeinfo.
In general, if we have 4 type for one class, we will have 5 serializers.
For example, we have int8/16/32/64/128 for python `int` type, then we have 6 serializers
for python `int`: `Int8/1632/64/128Serializer` for `int8/16/32/64/128` each, and another
`IntSerializer` for `int` which will dispatch to different `int8/16/32/64/128` type
according the actual value.
We do not get the acutal type here, because it will introduce extra computing.
For example, we have want to get actual `Int8/16/32/64Serializer`, we must check and
extract the actutal here which will introduce cost, and we will do same thing again
when serializing the actual data.
"""
cdef public object cls
cdef public int16_t type_id
cdef public Serializer serializer
cdef public MetaStringBytes namespace_bytes
cdef public MetaStringBytes typename_bytes
cdef public c_bool dynamic_type
def __init__(
self,
cls: Union[type, TypeVar] = None,
type_id: int = NO_CLASS_ID,
serializer: Serializer = None,
namespace_bytes: MetaStringBytes = None,
typename_bytes: MetaStringBytes = None,
dynamic_type: bool = False,
):
self.cls = cls
self.type_id = type_id
self.serializer = serializer
self.namespace_bytes = namespace_bytes
self.typename_bytes = typename_bytes
self.dynamic_type = dynamic_type
def __repr__(self):
return f"ClassInfo(cls={self.cls}, type_id={self.type_id}, " \
f"serializer={self.serializer})"
@cython.final
cdef class ClassResolver:
cdef:
readonly Fury fury
readonly MetaStringResolver metastring_resolver
object _resolver
vector[PyObject *] _c_registered_id_to_class_info
# cls -> ClassInfo
flat_hash_map[uint64_t, PyObject *] _c_classes_info
# hash -> ClassInfo
flat_hash_map[pair[int64_t, int64_t], PyObject *] _c_meta_hash_to_classinfo
MetaStringResolver meta_string_resolver
def __init__(self, fury):
self.fury = fury
self.metastring_resolver = fury.metastring_resolver
from pyfury._registry import ClassResolver
self._resolver = ClassResolver(fury)
def initialize(self):
self._resolver.initialize()
for classinfo in self._resolver._classes_info.values():
self._populate_typeinfo(classinfo)
def register_type(
self,
cls: Union[type, TypeVar],
*,
type_id: int = None,
namespace: str = None,
typename: str = None,
serializer=None,
):
typeinfo = self._resolver.register_type(
cls,
type_id=type_id,
namespace=namespace,
typename=typename,
serializer=serializer,
)
self._populate_typeinfo(typeinfo)
cdef _populate_typeinfo(self, typeinfo):
type_id = typeinfo.type_id
if type_id >= self._c_registered_id_to_class_info.size():
self._c_registered_id_to_class_info.resize(type_id * 2, NULL)
if type_id > 0 and (self.fury.language == Language.PYTHON or not IsNamespacedType(type_id)):
self._c_registered_id_to_class_info[type_id] = <PyObject *> typeinfo
self._c_classes_info[<uintptr_t> <PyObject *> typeinfo.cls] = <PyObject *> typeinfo
if typeinfo.typename_bytes is not None:
self._load_bytes_to_classinfo(type_id, typeinfo.namespace_bytes, typeinfo.typename_bytes)
def register_serializer(self, cls: Union[type, TypeVar], serializer):
classinfo1 = self._resolver.get_classinfo(cls)
self._resolver.register_serializer(cls, serializer)
classinfo2 = self._resolver.get_classinfo(cls)
if classinfo1.type_id != classinfo2.type_id:
self._c_registered_id_to_class_info[classinfo1.type_id] = NULL
self._populate_typeinfo(classinfo2)
cpdef inline Serializer get_serializer(self, cls):
"""
Returns
-------
Returns or create serializer for the provided class
"""
return self.get_classinfo(cls).serializer
cpdef inline ClassInfo get_classinfo(self, cls, create=True):
cdef PyObject * classinfo_ptr = self._c_classes_info[<uintptr_t> <PyObject *> cls]
cdef ClassInfo class_info
if classinfo_ptr != NULL:
class_info = <object> classinfo_ptr
if class_info.serializer is not None:
return class_info
else:
class_info.serializer = self._resolver._create_serializer(cls)
return class_info
elif not create:
return None
else:
class_info = self._resolver.get_classinfo(cls, create=create)
self._c_classes_info[<uintptr_t> <PyObject *> cls] = <PyObject *> class_info
self._populate_typeinfo(class_info)
return class_info
cdef inline ClassInfo _load_bytes_to_classinfo(
self, int32_t type_id, MetaStringBytes ns_metabytes, MetaStringBytes type_metabytes):
cdef PyObject * classinfo_ptr = self._c_meta_hash_to_classinfo[
pair[int64_t, int64_t](ns_metabytes.hashcode, type_metabytes.hashcode)]
if classinfo_ptr != NULL:
return <ClassInfo> classinfo_ptr
classinfo = self._resolver._load_metabytes_to_classinfo(ns_metabytes, type_metabytes)
classinfo_ptr = <PyObject *> classinfo
self._c_meta_hash_to_classinfo[pair[int64_t, int64_t](
ns_metabytes.hashcode, type_metabytes.hashcode)] = classinfo_ptr
return classinfo
cpdef write_typeinfo(self, Buffer buffer, ClassInfo classinfo):
if classinfo.dynamic_type:
return
cdef:
int32_t type_id = classinfo.type_id
int32_t internal_type_id = type_id & 0xFF
buffer.write_varuint32(type_id)
if IsNamespacedType(internal_type_id):
self.metastring_resolver.write_meta_string_bytes(buffer, classinfo.namespace_bytes)
self.metastring_resolver.write_meta_string_bytes(buffer, classinfo.typename_bytes)
cpdef inline ClassInfo read_typeinfo(self, Buffer buffer):
cdef:
int32_t type_id = buffer.read_varuint32()
int32_t internal_type_id = type_id & 0xFF
cdef MetaStringBytes namespace_bytes, typename_bytes
if IsNamespacedType(internal_type_id):
namespace_bytes = self.metastring_resolver.read_meta_string_bytes(buffer)
typename_bytes = self.metastring_resolver.read_meta_string_bytes(buffer)
return self._load_bytes_to_classinfo(type_id, namespace_bytes, typename_bytes)
if type_id < 0 or type_id > self._c_registered_id_to_class_info.size():
raise ValueError(f"Unexpected type_id {type_id}")
classinfo_ptr = self._c_registered_id_to_class_info[type_id]
if classinfo_ptr == NULL:
raise ValueError(f"Unexpected type_id {type_id}")
classinfo = <ClassInfo> classinfo_ptr
return classinfo
cpdef inline reset(self):
pass
cpdef inline reset_read(self):
pass
cpdef inline reset_write(self):
pass
@cython.final
cdef class Fury:
cdef readonly object language
cdef readonly c_bool ref_tracking
cdef readonly c_bool require_class_registration
cdef readonly c_bool is_py
cdef readonly MapRefResolver ref_resolver
cdef readonly ClassResolver class_resolver
cdef readonly MetaStringResolver metastring_resolver
cdef readonly SerializationContext serialization_context
cdef Buffer buffer
cdef public object pickler # pickle.Pickler
cdef public object unpickler # Optional[pickle.Unpickler]
cdef object _buffer_callback
cdef object _buffers # iterator
cdef object _unsupported_callback
cdef object _unsupported_objects # iterator
cdef object _peer_language
def __init__(
self,
language=Language.XLANG,
ref_tracking: bool = False,
require_class_registration: bool = True,
):
"""
:param require_class_registration:
Whether to require registering classes for serialization, enabled by default.
If disabled, unknown insecure classes can be deserialized, which can be
insecure and cause remote code execution attack if the classes
`__new__`/`__init__`/`__eq__`/`__hash__` method contain malicious code.
Do not disable class registration if you can't ensure your environment are
*indeed secure*. We are not responsible for security risks if
you disable this option.
"""
self.language = language
if _ENABLE_CLASS_REGISTRATION_FORCIBLY or require_class_registration:
self.require_class_registration = True
else:
self.require_class_registration = False
self.ref_tracking = ref_tracking
self.ref_resolver = MapRefResolver(ref_tracking)
self.is_py = self.language == Language.PYTHON
self.metastring_resolver = MetaStringResolver()
self.class_resolver = ClassResolver(self)
self.class_resolver.initialize()
self.serialization_context = SerializationContext()
self.buffer = Buffer.allocate(32)
if not require_class_registration:
warnings.warn(
"Class registration is disabled, unknown classes can be deserialized "
"which may be insecure.",
RuntimeWarning,
stacklevel=2,
)
self.pickler = Pickler(self.buffer)
else:
self.pickler = _PicklerStub()
self.unpickler = _UnpicklerStub()
self.unpickler = None
self._buffer_callback = None
self._buffers = None
self._unsupported_callback = None
self._unsupported_objects = None
self._peer_language = None
def register_serializer(self, cls: Union[type, TypeVar], Serializer serializer):
self.class_resolver.register_serializer(cls, serializer)
def register_type(
self,
cls: Union[type, TypeVar],
*,
type_id: int = None,
namespace: str = None,
typename: str = None,
serializer=None,
):
self.class_resolver.register_type(
cls, type_id=type_id, namespace=namespace, typename=typename, serializer=serializer)
def serialize(
self, obj,
Buffer buffer=None,
buffer_callback=None,
unsupported_callback=None
) -> Union[Buffer, bytes]:
try:
return self._serialize(
obj,
buffer,
buffer_callback=buffer_callback,
unsupported_callback=unsupported_callback)
finally:
self.reset_write()
cpdef inline _serialize(
self, obj, Buffer buffer, buffer_callback=None, unsupported_callback=None):
self._buffer_callback = buffer_callback
self._unsupported_callback = unsupported_callback
if buffer is not None:
self.pickler = Pickler(self.buffer)
else:
self.buffer.writer_index = 0
buffer = self.buffer
if self.language == Language.XLANG:
buffer.write_int16(MAGIC_NUMBER)
cdef int32_t mask_index = buffer.writer_index
# 1byte used for bit mask
buffer.grow(1)
buffer.writer_index = mask_index + 1
if obj is None:
set_bit(buffer, mask_index, 0)
else:
clear_bit(buffer, mask_index, 0)
# set endian
if is_little_endian:
set_bit(buffer, mask_index, 1)
else:
clear_bit(buffer, mask_index, 1)
if self.language == Language.XLANG:
# set reader as x_lang.
set_bit(buffer, mask_index, 2)
# set writer language.
buffer.write_int8(Language.PYTHON.value)
else:
# set reader as native.
clear_bit(buffer, mask_index, 2)
if self._buffer_callback is not None:
set_bit(buffer, mask_index, 3)
else:
clear_bit(buffer, mask_index, 3)
cdef int32_t start_offset
if self.language == Language.PYTHON:
self.serialize_ref(buffer, obj)
else:
self.xserialize_ref(buffer, obj)
if buffer is not self.buffer:
return buffer
else:
return buffer.to_bytes(0, buffer.writer_index)
cpdef inline serialize_ref(
self, Buffer buffer, obj, ClassInfo classinfo=None):
cls = type(obj)
if cls is str:
buffer.write_int16(NOT_NULL_STRING_FLAG)
buffer.write_string(obj)
return
elif cls is int:
buffer.write_int16(NOT_NULL_INT64_FLAG)
buffer.write_varint64(obj)
return
elif cls is bool:
buffer.write_int16(NOT_NULL_BOOL_FLAG)
buffer.write_bool(obj)
return
elif cls is float:
buffer.write_int16(NOT_NULL_FLOAT64_FLAG)
buffer.write_double(obj)
return
if self.ref_resolver.write_ref_or_null(buffer, obj):
return
if classinfo is None:
classinfo = self.class_resolver.get_classinfo(cls)
self.class_resolver.write_typeinfo(buffer, classinfo)
classinfo.serializer.write(buffer, obj)
cpdef inline serialize_nonref(self, Buffer buffer, obj):
cls = type(obj)
if cls is str:
buffer.write_varuint32(STRING_CLASS_ID)
buffer.write_string(obj)
return
elif cls is int:
buffer.write_varuint32(INT64_CLASS_ID)
buffer.write_varint64(obj)
return
elif cls is bool:
buffer.write_varuint32(BOOL_CLASS_ID)
buffer.write_bool(obj)
return
elif cls is float:
buffer.write_varuint32(FLOAT64_CLASS_ID)
buffer.write_double(obj)
return
cdef ClassInfo classinfo = self.class_resolver.get_classinfo(cls)
self.class_resolver.write_typeinfo(buffer, classinfo)
classinfo.serializer.write(buffer, obj)
cpdef inline xserialize_ref(
self, Buffer buffer, obj, Serializer serializer=None):
if serializer is None or serializer.need_to_write_ref:
if not self.ref_resolver.write_ref_or_null(buffer, obj):
self.xserialize_nonref(
buffer, obj, serializer=serializer
)
else:
if obj is None:
buffer.write_int8(NULL_FLAG)
else:
buffer.write_int8(NOT_NULL_VALUE_FLAG)
self.xserialize_nonref(
buffer, obj, serializer=serializer
)
cpdef inline xserialize_nonref(
self, Buffer buffer, obj, Serializer serializer=None):
if serializer is None:
classinfo = self.class_resolver.get_classinfo(type(obj))
self.class_resolver.write_typeinfo(buffer, classinfo)
serializer = classinfo.serializer
serializer.xwrite(buffer, obj)
def deserialize(
self,
buffer: Union[Buffer, bytes],
buffers: Iterable = None,
unsupported_objects: Iterable = None,
):
try:
if type(buffer) == bytes:
buffer = Buffer(buffer)
return self._deserialize(buffer, buffers, unsupported_objects)
finally:
self.reset_read()
cpdef inline _deserialize(
self, Buffer buffer, buffers=None, unsupported_objects=None):
if not self.require_class_registration:
self.unpickler = Unpickler(buffer)
if unsupported_objects is not None:
self._unsupported_objects = iter(unsupported_objects)
if self.language == Language.XLANG:
magic_numer = buffer.read_int16()
assert magic_numer == MAGIC_NUMBER, (
f"The fury xlang serialization must start with magic number {hex(MAGIC_NUMBER)}. "
"Please check whether the serialization is based on the xlang protocol and the "
"data didn't corrupt."
)
cdef int32_t reader_index = buffer.reader_index
buffer.reader_index = reader_index + 1
if get_bit(buffer, reader_index, 0):
return None
cdef c_bool is_little_endian_ = get_bit(buffer, reader_index, 1)
assert is_little_endian_, (
"Big endian is not supported for now, "
"please ensure peer machine is little endian."
)
cdef c_bool is_target_x_lang = get_bit(buffer, reader_index, 2)
if is_target_x_lang:
self._peer_language = Language(buffer.read_int8())
else:
self._peer_language = Language.PYTHON
cdef c_bool is_out_of_band_serialization_enabled = \
get_bit(buffer, reader_index, 3)
if is_out_of_band_serialization_enabled:
assert buffers is not None, (
"buffers shouldn't be null when the serialized stream is "
"produced with buffer_callback not null."
)
self._buffers = iter(buffers)
else:
assert buffers is None, (
"buffers should be null when the serialized stream is "
"produced with buffer_callback null."
)
if not is_target_x_lang:
return self.deserialize_ref(buffer)
return self.xdeserialize_ref(buffer)
cpdef inline deserialize_ref(self, Buffer buffer):
cdef MapRefResolver ref_resolver = self.ref_resolver
cdef int32_t ref_id = ref_resolver.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
return ref_resolver.get_read_object()
# indicates that the object is first read.
cdef ClassInfo classinfo = self.class_resolver.read_typeinfo(buffer)
cls = classinfo.cls
if cls is str:
return buffer.read_string()
elif cls is int:
return buffer.read_varint64()
elif cls is bool:
return buffer.read_bool()
elif cls is float:
return buffer.read_double()
o = classinfo.serializer.read(buffer)
ref_resolver.set_read_object(ref_id, o)
return o
cpdef inline deserialize_nonref(self, Buffer buffer):
"""Deserialize not-null and non-reference object from buffer."""
cdef ClassInfo classinfo = self.class_resolver.read_typeinfo(buffer)
cls = classinfo.cls
if cls is str:
return buffer.read_string()
elif cls is int:
return buffer.read_varint64()
elif cls is bool:
return buffer.read_bool()
elif cls is float:
return buffer.read_double()
return classinfo.serializer.read(buffer)
cpdef inline xdeserialize_ref(self, Buffer buffer, Serializer serializer=None):
cdef MapRefResolver ref_resolver
cdef int32_t ref_id
if serializer is None or serializer.need_to_write_ref:
ref_resolver = self.ref_resolver
ref_id = ref_resolver.try_preserve_ref_id(buffer)
# indicates that the object is first read.
if ref_id >= NOT_NULL_VALUE_FLAG:
o = self.xdeserialize_nonref(
buffer, serializer=serializer
)
ref_resolver.set_read_object(ref_id, o)
return o
else:
return ref_resolver.get_read_object()
cdef int8_t head_flag = buffer.read_int8()
if head_flag == NULL_FLAG:
return None
return self.xdeserialize_nonref(
buffer, serializer=serializer
)
cpdef inline xdeserialize_nonref(
self, Buffer buffer, Serializer serializer=None):
if serializer is None:
serializer = self.class_resolver.read_typeinfo(buffer).serializer
return serializer.xread(buffer)
cpdef inline write_buffer_object(self, Buffer buffer, buffer_object):
if self._buffer_callback is not None and self._buffer_callback(buffer_object):
buffer.write_bool(False)
return
buffer.write_bool(True)
cdef int32_t size = buffer_object.total_bytes()
# writer length.
buffer.write_varuint32(size)
cdef int32_t writer_index = buffer.writer_index
buffer.ensure(writer_index + size)
cdef Buffer buf = buffer.slice(buffer.writer_index, size)
buffer_object.write_to(buf)
buffer.writer_index += size
cpdef inline Buffer read_buffer_object(self, Buffer buffer):
cdef c_bool in_band = buffer.read_bool()
if not in_band:
assert self._buffers is not None
return next(self._buffers)
cdef int32_t size = buffer.read_varuint32()
cdef Buffer buf = buffer.slice(buffer.reader_index, size)
buffer.reader_index += size
return buf
cpdef inline handle_unsupported_write(self, Buffer buffer, obj):
if self._unsupported_callback is None or self._unsupported_callback(obj):
buffer.write_bool(True)
self.pickler.dump(obj)
else:
buffer.write_bool(False)
cpdef inline handle_unsupported_read(self, Buffer buffer):
cdef c_bool in_band = buffer.read_bool()
if in_band:
if self.unpickler is None:
self.unpickler.buffer = Unpickler(buffer)
return self.unpickler.load()
else:
assert self._unsupported_objects is not None
return next(self._unsupported_objects)
cpdef inline write_ref_pyobject(
self, Buffer buffer, value, ClassInfo classinfo=None):
if self.ref_resolver.write_ref_or_null(buffer, value):
return
if classinfo is None:
classinfo = self.class_resolver.get_classinfo(type(value))
self.class_resolver.write_typeinfo(buffer, classinfo)
classinfo.serializer.write(buffer, value)
cpdef inline read_ref_pyobject(self, Buffer buffer):
cdef MapRefResolver ref_resolver = self.ref_resolver
cdef int32_t ref_id = ref_resolver.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
return ref_resolver.get_read_object()
# indicates that the object is first read.
cdef ClassInfo classinfo = self.class_resolver.read_typeinfo(buffer)
o = classinfo.serializer.read(buffer)
ref_resolver.set_read_object(ref_id, o)
return o
cpdef inline reset_write(self):
self.ref_resolver.reset_write()
self.class_resolver.reset_write()
self.metastring_resolver.reset_write()
self.serialization_context.reset()
self.pickler.clear_memo()
self._unsupported_callback = None
cpdef inline reset_read(self):
self.ref_resolver.reset_read()
self.class_resolver.reset_read()
self.metastring_resolver.reset_read()
self.serialization_context.reset()
self._buffers = None
self.unpickler = None
self._unsupported_objects = None
cpdef inline reset(self):
self.reset_write()
self.reset_read()
cpdef inline write_nullable_pybool(Buffer buffer, value):
if value is None:
buffer.write_int8(NULL_FLAG)
else:
buffer.write_int8(NOT_NULL_VALUE_FLAG)
buffer.write_bool(value)
cpdef inline write_nullable_pyint64(Buffer buffer, value):
if value is None:
buffer.write_int8(NULL_FLAG)
else:
buffer.write_int8(NOT_NULL_VALUE_FLAG)
buffer.write_varint64(value)
cpdef inline write_nullable_pyfloat64(Buffer buffer, value):
if value is None:
buffer.write_int8(NULL_FLAG)
else:
buffer.write_int8(NOT_NULL_VALUE_FLAG)
buffer.write_double(value)
cpdef inline write_nullable_pystr(Buffer buffer, value):
if value is None:
buffer.write_int8(NULL_FLAG)
else:
buffer.write_int8(NOT_NULL_VALUE_FLAG)
buffer.write_string(value)
cpdef inline read_nullable_pybool(Buffer buffer):
if buffer.read_int8() == NOT_NULL_VALUE_FLAG:
return buffer.read_bool()
else:
return None
cpdef inline read_nullable_pyint64(Buffer buffer):
if buffer.read_int8() == NOT_NULL_VALUE_FLAG:
return buffer.read_varint64()
else:
return None
cpdef inline read_nullable_pyfloat64(Buffer buffer):
if buffer.read_int8() == NOT_NULL_VALUE_FLAG:
return buffer.read_double()
else:
return None
cpdef inline read_nullable_pystr(Buffer buffer):
if buffer.read_int8() == NOT_NULL_VALUE_FLAG:
return buffer.read_string()
else:
return None
@cython.final
cdef class SerializationContext:
cdef dict objects
def __init__(self):
self.objects = dict()
def add(self, key, obj):
self.objects[id(key)] = obj
def __contains__(self, key):
return id(key) in self.objects
def __getitem__(self, key):
return self.objects[id(key)]
def get(self, key):
return self.objects.get(id(key))
def reset(self):
if len(self.objects) > 0:
self.objects.clear()
cdef class Serializer:
cdef readonly Fury fury
cdef readonly object type_
cdef public c_bool need_to_write_ref
def __init__(self, fury, type_: Union[type, TypeVar]):
self.fury = fury
self.type_ = type_
self.need_to_write_ref = not is_primitive_type(type_)
cpdef write(self, Buffer buffer, value):
raise NotImplementedError(f"write method not implemented in {type(self)}")
cpdef read(self, Buffer buffer):
raise NotImplementedError(f"read method not implemented in {type(self)}")
cpdef xwrite(self, Buffer buffer, value):
raise NotImplementedError(f"xwrite method not implemented in {type(self)}")
cpdef xread(self, Buffer buffer):
raise NotImplementedError(f"xread method not implemented in {type(self)}")
@classmethod
def support_subclass(cls) -> bool:
return False
cdef class CrossLanguageCompatibleSerializer(Serializer):
cpdef xwrite(self, Buffer buffer, value):
self.write(buffer, value)
cpdef xread(self, Buffer buffer):
return self.read(buffer)
@cython.final
cdef class BooleanSerializer(CrossLanguageCompatibleSerializer):
cpdef inline write(self, Buffer buffer, value):
buffer.write_bool(value)
cpdef inline read(self, Buffer buffer):
return buffer.read_bool()
@cython.final
cdef class ByteSerializer(CrossLanguageCompatibleSerializer):
cpdef inline write(self, Buffer buffer, value):
buffer.write_int8(value)
cpdef inline read(self, Buffer buffer):
return buffer.read_int8()
@cython.final
cdef class Int16Serializer(CrossLanguageCompatibleSerializer):
cpdef inline write(self, Buffer buffer, value):
buffer.write_int16(value)
cpdef inline read(self, Buffer buffer):
return buffer.read_int16()
@cython.final
cdef class Int32Serializer(CrossLanguageCompatibleSerializer):
cpdef inline write(self, Buffer buffer, value):
buffer.write_varint32(value)
cpdef inline read(self, Buffer buffer):
return buffer.read_varint32()
@cython.final
cdef class Int64Serializer(CrossLanguageCompatibleSerializer):
cpdef inline xwrite(self, Buffer buffer, value):
buffer.write_varint64(value)
cpdef inline xread(self, Buffer buffer):
return buffer.read_varint64()
cpdef inline write(self, Buffer buffer, value):
buffer.write_varint64(value)
cpdef inline read(self, Buffer buffer):
return buffer.read_varint64()
cdef int64_t INT8_MIN_VALUE = -1 << 7
cdef int64_t INT8_MAX_VALUE = 1 << 7 - 1
cdef int64_t INT16_MIN_VALUE = -1 << 15
cdef int64_t INT16_MAX_VALUE = 1 << 15 - 1
cdef int64_t INT32_MIN_VALUE = -1 << 31
cdef int64_t INT32_MAX_VALUE = 1 << 31 - 1
cdef float FLOAT32_MIN_VALUE = 1.17549e-38
cdef float FLOAT32_MAX_VALUE = 3.40282e+38
@cython.final
cdef class Float32Serializer(CrossLanguageCompatibleSerializer):
cpdef inline write(self, Buffer buffer, value):
buffer.write_float(value)
cpdef inline read(self, Buffer buffer):
return buffer.read_float()
@cython.final
cdef class Float64Serializer(CrossLanguageCompatibleSerializer):
cpdef inline write(self, Buffer buffer, value):
buffer.write_double(value)
cpdef inline read(self, Buffer buffer):
return buffer.read_double()
@cython.final
cdef class StringSerializer(CrossLanguageCompatibleSerializer):
def __init__(self, fury, type_, track_ref=False):
super().__init__(fury, type_)
self.need_to_write_ref = track_ref
cpdef inline write(self, Buffer buffer, value):
buffer.write_string(value)
cpdef inline read(self, Buffer buffer):
return buffer.read_string()
cdef _base_date = datetime.date(1970, 1, 1)
@cython.final
cdef class DateSerializer(CrossLanguageCompatibleSerializer):
cpdef inline write(self, Buffer buffer, value):
if type(value) is not datetime.date:
raise TypeError(
"{} should be {} instead of {}".format(
value, datetime.date, type(value)
)
)
days = (value - _base_date).days
buffer.write_int32(days)
cpdef inline read(self, Buffer buffer):
days = buffer.read_int32()
return _base_date + datetime.timedelta(days=days)
@cython.final
cdef class TimestampSerializer(CrossLanguageCompatibleSerializer):
cdef bint win_platform
def __init__(self, fury, type_: Union[type, TypeVar]):
super().__init__(fury, type_)
self.win_platform = platform.system() == "Windows"
cdef inline _get_timestamp(self, value):
seconds_offset = 0
if self.win_platform and value.tzinfo is None:
is_dst = time.daylight and time.localtime().tm_isdst > 0
seconds_offset = time.altzone if is_dst else time.timezone
value = value.replace(tzinfo=datetime.timezone.utc)
return int((value.timestamp() + seconds_offset) * 1000000)
cpdef inline write(self, Buffer buffer, value):
if type(value) is not datetime.datetime:
raise TypeError(
"{} should be {} instead of {}".format(value, datetime, type(value))
)
# TimestampType represent micro seconds
buffer.write_int64(self._get_timestamp(value))
cpdef inline read(self, Buffer buffer):
ts = buffer.read_int64() / 1000000
# TODO support timezone
return datetime.datetime.fromtimestamp(ts)
"""
Collection serialization format:
https://fury.apache.org/docs/specification/fury_xlang_serialization_spec/#list
Has the following changes:
* None has an independent NonType type, so COLLECTION_NOT_SAME_TYPE can also cover the concept of being nullable.
* No flag is needed to indicate that the element type is not the declared type.
"""
cdef int8_t COLLECTION_DEFAULT_FLAG = 0b0
cdef int8_t COLLECTION_TRACKING_REF = 0b1
cdef int8_t COLLECTION_HAS_NULL = 0b10
cdef int8_t COLLECTION_NOT_DECL_ELEMENT_TYPE = 0b100
cdef int8_t COLLECTION_NOT_SAME_TYPE = 0b1000
cdef class CollectionSerializer(Serializer):
cdef ClassResolver class_resolver
cdef MapRefResolver ref_resolver
cdef Serializer elem_serializer
cdef c_bool is_py
cdef int8_t elem_tracking_ref
cdef elem_type
cdef ClassInfo elem_typeinfo
def __init__(self, fury, type_, elem_serializer=None):
super().__init__(fury, type_)
self.class_resolver = fury.class_resolver
self.ref_resolver = fury.ref_resolver
self.elem_serializer = elem_serializer
if elem_serializer is None:
self.elem_type = None
self.elem_typeinfo = self.class_resolver.get_classinfo(None)
self.elem_tracking_ref = -1
else:
self.elem_type = elem_serializer.type_
self.elem_typeinfo = fury.class_resolver.get_classinfo(self.elem_type)
self.elem_tracking_ref = <int8_t> (elem_serializer.need_to_write_ref)
self.is_py = fury.is_py
cdef pair[int8_t, int64_t] write_header(self, Buffer buffer, value):
cdef int8_t collect_flag = COLLECTION_DEFAULT_FLAG
elem_type = self.elem_type
cdef ClassInfo elem_typeinfo = self.elem_typeinfo
cdef c_bool has_null = False
cdef c_bool has_different_type = False
if elem_type is None:
collect_flag = COLLECTION_NOT_DECL_ELEMENT_TYPE
for s in value:
if not has_null and s is None:
has_null = True
continue
if elem_type is None:
elem_type = type(s)
elif not has_different_type and type(s) is not elem_type:
collect_flag |= COLLECTION_NOT_SAME_TYPE
has_different_type = True
if not has_different_type:
elem_typeinfo = self.class_resolver.get_classinfo(elem_type)
else:
for s in value:
if s is None:
has_null = True
break
if has_null:
collect_flag |= COLLECTION_HAS_NULL
if self.fury.ref_tracking:
if self.elem_tracking_ref == 1:
collect_flag |= COLLECTION_TRACKING_REF
elif self.elem_tracking_ref == -1:
if has_different_type or elem_typeinfo.serializer.need_to_write_ref:
collect_flag |= COLLECTION_TRACKING_REF
buffer.write_varuint32(len(value))
buffer.write_int8(collect_flag)
if (not has_different_type and
collect_flag & COLLECTION_NOT_DECL_ELEMENT_TYPE != 0):
self.class_resolver.write_typeinfo(buffer, elem_typeinfo)
return pair[int8_t, int64_t](collect_flag, obj2int(elem_typeinfo))
cpdef write(self, Buffer buffer, value):
if len(value) == 0:
buffer.write_varuint64(0)
return
cdef pair[int8_t, int64_t] header_pair = self.write_header(buffer, value)
cdef int8_t collect_flag = header_pair.first
cdef int64_t elem_typeinfo_ptr = header_pair.second
cdef ClassInfo elem_typeinfo = <type> int2obj(elem_typeinfo_ptr)
cdef elem_type = elem_typeinfo.cls
cdef MapRefResolver ref_resolver = self.ref_resolver
cdef ClassResolver class_resolver = self.class_resolver
cdef c_bool is_py = self.is_py
cdef serializer = type(elem_typeinfo.serializer)
if (collect_flag & COLLECTION_NOT_SAME_TYPE) == 0:
if elem_type is str:
self._write_string(buffer, value)
elif serializer is Int64Serializer:
self._write_int(buffer, value)
elif elem_type is bool:
self._write_bool(buffer, value)
elif serializer is Float64Serializer:
self._write_float(buffer, value)
else:
if (collect_flag & COLLECTION_TRACKING_REF) == 0:
self._write_same_type_no_ref(buffer, value, elem_typeinfo)
else:
self._write_same_type_ref(buffer, value, elem_typeinfo)
else:
for s in value:
cls = type(s)
if cls is str:
buffer.write_int16(NOT_NULL_STRING_FLAG)
buffer.write_string(s)
elif cls is int:
buffer.write_int16(NOT_NULL_INT64_FLAG)
buffer.write_varint64(s)
elif cls is bool:
buffer.write_int16(NOT_NULL_BOOL_FLAG)
buffer.write_bool(s)
elif cls is float:
buffer.write_int16(NOT_NULL_FLOAT64_FLAG)
buffer.write_double(s)
else:
if not ref_resolver.write_ref_or_null(buffer, s):
classinfo = class_resolver.get_classinfo(cls)
class_resolver.write_typeinfo(buffer, classinfo)
if is_py:
classinfo.serializer.write(buffer, s)
else:
classinfo.serializer.xwrite(buffer, s)
cdef inline _write_string(self, Buffer buffer, value):
for s in value:
buffer.write_string(s)
cdef inline _read_string(self, Buffer buffer, int64_t len_, object collection_):
for i in range(len_):
self._add_element(collection_, i, buffer.read_string())
cdef inline _write_int(self, Buffer buffer, value):
for s in value:
buffer.write_varint64(s)
cdef inline _read_int(self, Buffer buffer, int64_t len_, object collection_):
for i in range(len_):
self._add_element(collection_, i, buffer.read_varint64())
cdef inline _write_bool(self, Buffer buffer, value):
value_type = type(value)
if value_type is list or value_type is tuple:
size = sizeof(bool) * Py_SIZE(value)
buffer.grow(<int32_t>size)
Fury_PyBooleanSequenceWriteToBuffer(value, buffer.c_buffer.get(), buffer.writer_index)
buffer.writer_index += size
else:
for s in value:
buffer.write_bool(s)
cdef inline _read_bool(self, Buffer buffer, int64_t len_, object collection_):
for i in range(len_):
self._add_element(collection_, i, buffer.read_bool())
cdef inline _write_float(self, Buffer buffer, value):
value_type = type(value)
if value_type is list or value_type is tuple:
size = sizeof(double) * Py_SIZE(value)
buffer.grow(<int32_t>size)
Fury_PyFloatSequenceWriteToBuffer(value, buffer.c_buffer.get(), buffer.writer_index)
buffer.writer_index += size
else:
for s in value:
buffer.write_double(s)
cdef inline _read_float(self, Buffer buffer, int64_t len_, object collection_):
for i in range(len_):
self._add_element(collection_, i, buffer.read_double())
cpdef _write_same_type_no_ref(self, Buffer buffer, value, ClassInfo classinfo):
cdef MapRefResolver ref_resolver = self.ref_resolver
cdef ClassResolver class_resolver = self.class_resolver
if self.is_py:
for s in value:
classinfo.serializer.write(buffer, s)
else:
for s in value:
classinfo.serializer.xwrite(buffer, s)
cpdef _read_same_type_no_ref(self, Buffer buffer, int64_t len_, object collection_, ClassInfo classinfo):
cdef MapRefResolver ref_resolver = self.ref_resolver
cdef ClassResolver class_resolver = self.class_resolver
if self.is_py:
for i in range(len_):
obj = classinfo.serializer.read(buffer)
self._add_element(collection_, i, obj)
else:
for i in range(len_):
obj = classinfo.serializer.xread(buffer)
self._add_element(collection_, i, obj)
cpdef _write_same_type_ref(self, Buffer buffer, value, ClassInfo classinfo):
cdef MapRefResolver ref_resolver = self.ref_resolver
cdef ClassResolver class_resolver = self.class_resolver
if self.is_py:
for s in value:
if not ref_resolver.write_ref_or_null(buffer, s):
classinfo.serializer.write(buffer, s)
else:
for s in value:
if not ref_resolver.write_ref_or_null(buffer, s):
classinfo.serializer.xwrite(buffer, s)
cpdef _read_same_type_ref(self, Buffer buffer, int64_t len_, object collection_, ClassInfo classinfo):
cdef MapRefResolver ref_resolver = self.ref_resolver
cdef ClassResolver class_resolver = self.class_resolver
cdef c_bool is_py = self.is_py
for i in range(len_):
ref_id = ref_resolver.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
obj = ref_resolver.get_read_object()
else:
if is_py:
obj = classinfo.serializer.read(buffer)
else:
obj = classinfo.serializer.xread(buffer)
ref_resolver.set_read_object(ref_id, obj)
self._add_element(collection_, i, obj)
cpdef _add_element(self, object collection_, int64_t index, object element):
raise NotImplementedError
cpdef xwrite(self, Buffer buffer, value):
self.write(buffer, value)
cdef class ListSerializer(CollectionSerializer):
cpdef read(self, Buffer buffer):
cdef MapRefResolver ref_resolver = self.fury.ref_resolver
cdef ClassResolver class_resolver = self.fury.class_resolver
cdef int32_t len_ = buffer.read_varuint32()
cdef list list_ = PyList_New(len_)
if len_ == 0:
return list_
cdef int8_t collect_flag = buffer.read_int8()
ref_resolver.reference(list_)
cdef c_bool is_py = self.is_py
cdef ClassInfo classinfo
cdef int32_t type_id = -1
if (collect_flag & COLLECTION_NOT_SAME_TYPE) == 0:
if collect_flag & COLLECTION_NOT_DECL_ELEMENT_TYPE != 0:
classinfo = self.class_resolver.read_typeinfo(buffer)
else:
classinfo = self.elem_typeinfo
if (collect_flag & COLLECTION_HAS_NULL) == 0:
type_id = classinfo.type_id
if type_id == <int32_t>TypeId.STRING:
self._read_string(buffer, len_, list_)
return list_
elif type_id == <int32_t>TypeId.VAR_INT64:
self._read_int(buffer, len_, list_)
return list_
elif type_id == <int32_t>TypeId.BOOL:
self._read_bool(buffer, len_, list_)
return list_
elif type_id == <int32_t>TypeId.FLOAT64:
self._read_float(buffer, len_, list_)
return list_
if (collect_flag & COLLECTION_TRACKING_REF) == 0:
self._read_same_type_no_ref(buffer, len_, list_, classinfo)
else:
self._read_same_type_ref(buffer, len_, list_, classinfo)
else:
for i in range(len_):
elem = get_next_element(buffer, ref_resolver, class_resolver, is_py)
Py_INCREF(elem)
PyList_SET_ITEM(list_, i, elem)
return list_
cpdef _add_element(self, object collection_, int64_t index, object element):
Py_INCREF(element)
PyList_SET_ITEM(collection_, index, element)
cpdef xread(self, Buffer buffer):
return self.read(buffer)
cdef inline get_next_element(
Buffer buffer,
MapRefResolver ref_resolver,
ClassResolver class_resolver,
c_bool is_py,
):
cdef int32_t ref_id
cdef ClassInfo classinfo
ref_id = ref_resolver.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
return ref_resolver.get_read_object()
# indicates that the object is first read.
classinfo = class_resolver.read_typeinfo(buffer)
cdef int32_t type_id = classinfo.type_id
# Note that all read operations in fast paths of list/tuple/set/dict/sub_dict
# ust match corresponding writing operations. Otherwise, ref tracking will
# error.
if type_id == <int32_t>TypeId.STRING:
return buffer.read_string()
elif type_id == <int32_t>TypeId.VAR_INT32:
return buffer.read_varint64()
elif type_id == <int32_t>TypeId.BOOL:
return buffer.read_bool()
elif type_id == <int32_t>TypeId.FLOAT64:
return buffer.read_double()
else:
if is_py:
o = classinfo.serializer.read(buffer)
else:
o = classinfo.serializer.xread(buffer)
ref_resolver.set_read_object(ref_id, o)
return o
@cython.final
cdef class TupleSerializer(CollectionSerializer):
cpdef inline read(self, Buffer buffer):
cdef MapRefResolver ref_resolver = self.fury.ref_resolver
cdef ClassResolver class_resolver = self.fury.class_resolver
cdef int32_t len_ = buffer.read_varuint32()
cdef tuple tuple_ = PyTuple_New(len_)
if len_ == 0:
return tuple_
cdef int8_t collect_flag = buffer.read_int8()
cdef c_bool is_py = self.is_py
cdef ClassInfo classinfo
cdef int32_t type_id = -1
if (collect_flag & COLLECTION_NOT_SAME_TYPE) == 0:
if collect_flag & COLLECTION_NOT_DECL_ELEMENT_TYPE != 0:
classinfo = self.class_resolver.read_typeinfo(buffer)
else:
classinfo = self.elem_typeinfo
if (collect_flag & COLLECTION_HAS_NULL) == 0:
type_id = classinfo.type_id
if type_id == <int32_t>TypeId.STRING:
self._read_string(buffer, len_, tuple_)
return tuple_
if type_id == <int32_t>TypeId.VAR_INT64:
self._read_int(buffer, len_, tuple_)
return tuple_
if type_id == <int32_t>TypeId.BOOL:
self._read_bool(buffer, len_, tuple_)
return tuple_
if type_id == <int32_t>TypeId.FLOAT64:
self._read_float(buffer, len_, tuple_)
return tuple_
if (collect_flag & COLLECTION_TRACKING_REF) == 0:
self._read_same_type_no_ref(buffer, len_, tuple_, classinfo)
else:
self._read_same_type_ref(buffer, len_, tuple_, classinfo)
else:
for i in range(len_):
elem = get_next_element(buffer, ref_resolver, class_resolver, is_py)
Py_INCREF(elem)
PyTuple_SET_ITEM(tuple_, i, elem)
return tuple_
cpdef inline _add_element(self, object collection_, int64_t index, object element):
Py_INCREF(element)
PyTuple_SET_ITEM(collection_, index, element)
cpdef inline xread(self, Buffer buffer):
return self.read(buffer)
@cython.final
cdef class StringArraySerializer(ListSerializer):
def __init__(self, fury, type_):
super().__init__(fury, type_, StringSerializer(fury, str))
@cython.final
cdef class SetSerializer(CollectionSerializer):
cpdef inline read(self, Buffer buffer):
cdef MapRefResolver ref_resolver = self.fury.ref_resolver
cdef ClassResolver class_resolver = self.fury.class_resolver
cdef set instance = set()
ref_resolver.reference(instance)
cdef int32_t len_ = buffer.read_varuint32()
if len_ == 0:
return instance
cdef int8_t collect_flag = buffer.read_int8()
cdef int32_t ref_id
cdef ClassInfo classinfo
cdef int32_t type_id = -1
cdef c_bool is_py = self.is_py
if (collect_flag & COLLECTION_NOT_SAME_TYPE) == 0:
if collect_flag & COLLECTION_NOT_DECL_ELEMENT_TYPE != 0:
classinfo = self.class_resolver.read_typeinfo(buffer)
else:
classinfo = self.elem_typeinfo
if (collect_flag & COLLECTION_HAS_NULL) == 0:
type_id = classinfo.type_id
if type_id == <int32_t>TypeId.STRING:
self._read_string(buffer, len_, instance)
return instance
if type_id == <int32_t>TypeId.VAR_INT64:
self._read_int(buffer, len_, instance)
return instance
if type_id == <int32_t>TypeId.BOOL:
self._read_bool(buffer, len_, instance)
return instance
if type_id == <int32_t>TypeId.FLOAT64:
self._read_float(buffer, len_, instance)
return instance
if (collect_flag & COLLECTION_TRACKING_REF) == 0:
self._read_same_type_no_ref(buffer, len_, instance, classinfo)
else:
self._read_same_type_ref(buffer, len_, instance, classinfo)
else:
for i in range(len_):
ref_id = ref_resolver.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
instance.add(ref_resolver.get_read_object())
continue
# indicates that the object is first read.
classinfo = class_resolver.read_typeinfo(buffer)
type_id = classinfo.type_id
if type_id == <int32_t>TypeId.STRING:
instance.add(buffer.read_string())
elif type_id == <int32_t>TypeId.VAR_INT64:
instance.add(buffer.read_varint64())
elif type_id == <int32_t>TypeId.BOOL:
instance.add(buffer.read_bool())
elif type_id == <int32_t>TypeId.FLOAT64:
instance.add(buffer.read_double())
else:
if is_py:
o = classinfo.serializer.read(buffer)
else:
o = classinfo.serializer.xread(buffer)
ref_resolver.set_read_object(ref_id, o)
instance.add(o)
return instance
cpdef inline _add_element(self, object collection_, int64_t index, object element):
collection_.add(element)
cpdef inline xread(self, Buffer buffer):
return self.read(buffer)
cdef int32_t MAX_CHUNK_SIZE = 255
# Whether track key ref.
cdef int32_t TRACKING_KEY_REF = 0b1
# Whether key has null.
cdef int32_t KEY_HAS_NULL = 0b10
# Whether key is not declare type.
cdef int32_t KEY_DECL_TYPE = 0b100
# Whether track value ref.
cdef int32_t TRACKING_VALUE_REF = 0b1000
# Whether value has null.
cdef int32_t VALUE_HAS_NULL = 0b10000
# Whether value is not declare type.
cdef int32_t VALUE_DECL_TYPE = 0b100000
# When key or value is null that entry will be serialized as a new chunk with size 1.
# In such cases, chunk size will be skipped writing.
# Both key and value are null.
cdef int32_t KV_NULL = KEY_HAS_NULL | VALUE_HAS_NULL
# Key is null, value type is declared type, and ref tracking for value is disabled.
cdef int32_t NULL_KEY_VALUE_DECL_TYPE = KEY_HAS_NULL | VALUE_DECL_TYPE
# Key is null, value type is declared type, and ref tracking for value is enabled.
cdef int32_t NULL_KEY_VALUE_DECL_TYPE_TRACKING_REF =KEY_HAS_NULL | VALUE_DECL_TYPE | TRACKING_VALUE_REF
# Value is null, key type is declared type, and ref tracking for key is disabled.
cdef int32_t NULL_VALUE_KEY_DECL_TYPE = VALUE_HAS_NULL | KEY_DECL_TYPE
# Value is null, key type is declared type, and ref tracking for key is enabled.
cdef int32_t NULL_VALUE_KEY_DECL_TYPE_TRACKING_REF = VALUE_HAS_NULL | KEY_DECL_TYPE | TRACKING_VALUE_REF
@cython.final
cdef class MapSerializer(Serializer):
cdef ClassResolver class_resolver
cdef MapRefResolver ref_resolver
cdef Serializer key_serializer
cdef Serializer value_serializer
cdef c_bool is_py
def __init__(self, fury, type_, key_serializer=None, value_serializer=None):
super().__init__(fury, type_)
self.class_resolver = fury.class_resolver
self.ref_resolver = fury.ref_resolver
self.key_serializer = key_serializer
self.value_serializer = value_serializer
self.is_py = fury.is_py
cpdef inline write(self, Buffer buffer, o):
cdef dict obj = o
cdef int32_t length = len(obj)
buffer.write_varuint32(length)
if length == 0:
return
cdef int64_t key_addr, value_addr
cdef Py_ssize_t pos = 0
cdef Fury fury = self.fury
cdef ClassResolver class_resolver = fury.class_resolver
cdef MapRefResolver ref_resolver = fury.ref_resolver
cdef Serializer key_serializer = self.key_serializer
cdef Serializer value_serializer = self.value_serializer
cdef type key_cls, value_cls, key_serializer_type, value_serializer_type
cdef ClassInfo key_classinfo, value_classinfo
cdef int32_t chunk_size_offset, chunk_header, chunk_size
cdef c_bool key_write_ref, value_write_ref
cdef int has_next = PyDict_Next(obj, &pos, <PyObject **>&key_addr, <PyObject **>&value_addr)
cdef c_bool is_py = self.is_py
while has_next != 0:
key = int2obj(key_addr)
Py_INCREF(key)
value = int2obj(value_addr)
Py_INCREF(value)
while has_next != 0:
if key is not None:
if value is not None:
break
if key_serializer is not None:
if key_serializer.need_to_write_ref:
buffer.write_int8(NULL_VALUE_KEY_DECL_TYPE_TRACKING_REF)
if not self.ref_resolver.write_ref_or_null(buffer, key):
if is_py:
key_serializer.write(buffer, key)
else:
key_serializer.xwrite(buffer, key)
else:
buffer.write_int8(NULL_VALUE_KEY_DECL_TYPE)
if is_py:
key_serializer.write(buffer, key)
else:
key_serializer.xwrite(buffer, key)
else:
buffer.write_int8(VALUE_HAS_NULL | TRACKING_KEY_REF)
if is_py:
fury.serialize_ref(buffer, key)
else:
fury.xserialize_ref(buffer, key)
else:
if value is not None:
if value_serializer is not None:
if value_serializer.need_to_write_ref:
buffer.write_int8(NULL_KEY_VALUE_DECL_TYPE_TRACKING_REF)
if not self.ref_resolver.write_ref_or_null(buffer, value):
if is_py:
value_serializer.write(buffer, value)
else:
value_serializer.xwrite(buffer, value)
if not self.ref_resolver.write_ref_or_null(buffer, value):
if is_py:
value_serializer.write(buffer, value)
else:
value_serializer.xwrite(buffer, value)
else:
buffer.write_int8(NULL_KEY_VALUE_DECL_TYPE)
if is_py:
value_serializer.write(buffer, value)
else:
value_serializer.xwrite(buffer, value)
else:
buffer.write_int8(KEY_HAS_NULL | TRACKING_VALUE_REF)
if is_py:
fury.serialize_ref(buffer, value)
else:
fury.xserialize_ref(buffer, value)
else:
buffer.write_int8(KV_NULL)
has_next = PyDict_Next(obj, &pos, <PyObject **>&key_addr, <PyObject **>&value_addr)
key = int2obj(key_addr)
Py_INCREF(key)
value = int2obj(value_addr)
Py_INCREF(value)
if has_next == 0:
break
key_cls = type(key)
value_cls = type(value)
buffer.write_int16(-1)
chunk_size_offset = buffer.writer_index - 1
chunk_header = 0
if key_serializer is not None:
chunk_header |= KEY_DECL_TYPE
else:
key_classinfo = self.class_resolver.get_classinfo(key_cls)
class_resolver.write_typeinfo(buffer, key_classinfo)
key_serializer = key_classinfo.serializer
if value_serializer is not None:
chunk_header |= VALUE_DECL_TYPE
else:
value_classinfo = self.class_resolver.get_classinfo(value_cls)
class_resolver.write_typeinfo(buffer, value_classinfo)
value_serializer = value_classinfo.serializer
key_write_ref = key_serializer.need_to_write_ref
value_write_ref = value_serializer.need_to_write_ref
if key_write_ref:
chunk_header |= TRACKING_KEY_REF
if value_write_ref:
chunk_header |= TRACKING_VALUE_REF
buffer.put_int8(chunk_size_offset - 1, chunk_header)
key_serializer_type = type(key_serializer)
value_serializer_type = type(value_serializer)
chunk_size = 0
while True:
if (key is None or value is None or
type(key) is not key_cls or type(value) is not value_cls):
break
if not key_write_ref or not ref_resolver.write_ref_or_null(buffer, key):
if key_cls is str:
buffer.write_string(key)
elif key_serializer_type is Int64Serializer:
buffer.write_varint64(key)
elif key_serializer_type is Float64Serializer:
buffer.write_double(key)
elif key_serializer_type is Int32Serializer:
buffer.write_varint32(key)
elif key_serializer_type is Float32Serializer:
buffer.write_float(key)
else:
if is_py:
key_serializer.write(buffer, key)
else:
key_serializer.xwrite(buffer, key)
if not value_write_ref or not ref_resolver.write_ref_or_null(buffer, value):
if value_cls is str:
buffer.write_string(value)
elif value_serializer_type is Int64Serializer:
buffer.write_varint64(value)
elif value_serializer_type is Float64Serializer:
buffer.write_double(value)
elif value_serializer_type is Int32Serializer:
buffer.write_varint32(value)
elif value_serializer_type is Float32Serializer:
buffer.write_float(value)
elif value_serializer_type is BooleanSerializer:
buffer.write_bool(value)
else:
if is_py:
value_serializer.write(buffer, value)
else:
value_serializer.xwrite(buffer, value)
chunk_size += 1
has_next = PyDict_Next(obj, &pos, <PyObject **>&key_addr, <PyObject **>&value_addr)
if has_next == 0:
break
if chunk_size == MAX_CHUNK_SIZE:
break
key = int2obj(key_addr)
Py_INCREF(key)
value = int2obj(value_addr)
Py_INCREF(value)
key_serializer = self.key_serializer
value_serializer = self.value_serializer
buffer.put_int8(chunk_size_offset, chunk_size)
cpdef inline read(self, Buffer buffer):
cdef Fury fury = self.fury
cdef MapRefResolver ref_resolver = self.ref_resolver
cdef ClassResolver class_resolver = self.class_resolver
cdef int32_t size = buffer.read_varuint32()
cdef dict map_ = _PyDict_NewPresized(size)
ref_resolver.reference(map_)
cdef int32_t ref_id
cdef ClassInfo key_classinfo, value_classinfo
cdef int32_t chunk_header = 0
if size != 0:
chunk_header = buffer.read_uint8()
cdef Serializer key_serializer = self.key_serializer
cdef Serializer value_serializer = self.value_serializer
cdef c_bool key_has_null, value_has_null, track_key_ref, track_value_ref
cdef c_bool key_is_declared_type, value_is_declared_type
cdef type key_serializer_type, value_serializer_type
cdef int32_t chunk_size
cdef c_bool is_py = self.is_py
while size > 0:
while True:
key_has_null = (chunk_header & KEY_HAS_NULL) != 0
value_has_null = (chunk_header & VALUE_HAS_NULL) != 0
if not key_has_null:
if not value_has_null:
break
else:
track_key_ref = (chunk_header & TRACKING_KEY_REF) != 0
if (chunk_header & KEY_DECL_TYPE) != 0:
if track_key_ref:
ref_id = ref_resolver.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
key = ref_resolver.get_read_object()
else:
if is_py:
key = key_serializer.read(buffer)
else:
key = key_serializer.xread(buffer)
ref_resolver.set_read_object(ref_id, key)
else:
if is_py:
key = key_serializer.read(buffer)
else:
key = key_serializer.xread(buffer)
else:
if is_py:
key = fury.deserialize_ref(buffer)
else:
key = fury.xdeserialize_ref(buffer)
map_[key] = None
else:
if not value_has_null:
track_value_ref = (chunk_header & TRACKING_VALUE_REF) != 0
if (chunk_header & VALUE_DECL_TYPE) != 0:
if track_value_ref:
ref_id = ref_resolver.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
value = ref_resolver.get_read_object()
else:
if is_py:
value = value_serializer.read(buffer)
else:
value = value_serializer.xread(buffer)
ref_resolver.set_read_object(ref_id, value)
else:
if is_py:
value = fury.deserialize_ref(buffer)
else:
value = fury.xdeserialize_ref(buffer)
map_[None] = value
else:
map_[None] = None
size -= 1
if size == 0:
return map_
else:
chunk_header = buffer.read_uint8()
track_key_ref = (chunk_header & TRACKING_KEY_REF) != 0
track_value_ref = (chunk_header & TRACKING_VALUE_REF) != 0
key_is_declared_type = (chunk_header & KEY_DECL_TYPE) != 0
value_is_declared_type = (chunk_header & VALUE_DECL_TYPE) != 0
chunk_size = buffer.read_uint8()
if not key_is_declared_type:
key_serializer = class_resolver.read_typeinfo(buffer).serializer
if not value_is_declared_type:
value_serializer = class_resolver.read_typeinfo(buffer).serializer
key_serializer_type = type(key_serializer)
value_serializer_type = type(value_serializer)
for i in range(chunk_size):
if track_key_ref:
ref_id = ref_resolver.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
key = ref_resolver.get_read_object()
else:
if is_py:
key = key_serializer.read(buffer)
else:
key = key_serializer.xread(buffer)
ref_resolver.set_read_object(ref_id, key)
else:
if key_serializer_type is StringSerializer:
key = buffer.read_string()
elif key_serializer_type is Int64Serializer:
key = buffer.read_varint64()
elif key_serializer_type is Float64Serializer:
key = buffer.read_double()
elif key_serializer_type is Int32Serializer:
key = buffer.read_varint32()
elif key_serializer_type is Float32Serializer:
key = buffer.read_float()
else:
if is_py:
key = key_serializer.read(buffer)
else:
key = key_serializer.xread(buffer)
if track_value_ref:
ref_id = ref_resolver.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
value = ref_resolver.get_read_object()
else:
if is_py:
value = value_serializer.read(buffer)
else:
value = value_serializer.xread(buffer)
ref_resolver.set_read_object(ref_id, value)
else:
if value_serializer_type is StringSerializer:
value = buffer.read_string()
elif value_serializer_type is Int64Serializer:
value = buffer.read_varint64()
elif value_serializer_type is Float64Serializer:
value = buffer.read_double()
elif value_serializer_type is Int32Serializer:
value = buffer.read_varint32()
elif value_serializer_type is Float32Serializer:
value = buffer.read_float()
elif value_serializer_type is BooleanSerializer:
value = buffer.read_bool()
else:
if is_py:
value = value_serializer.read(buffer)
else:
value = value_serializer.xread(buffer)
map_[key] = value
size -= 1
if size != 0:
chunk_header = buffer.read_uint8()
return map_
cpdef inline xwrite(self, Buffer buffer, o):
self.write(buffer, o)
cpdef inline xread(self, Buffer buffer):
return self.read(buffer)
@cython.final
cdef class SubMapSerializer(Serializer):
cdef ClassResolver class_resolver
cdef MapRefResolver ref_resolver
cdef Serializer key_serializer
cdef Serializer value_serializer
def __init__(self, fury, type_, key_serializer=None, value_serializer=None):
super().__init__(fury, type_)
self.class_resolver = fury.class_resolver
self.ref_resolver = fury.ref_resolver
self.key_serializer = key_serializer
self.value_serializer = value_serializer
cpdef inline write(self, Buffer buffer, value):
buffer.write_varuint32(len(value))
cdef ClassInfo key_classinfo
cdef ClassInfo value_classinfo
for k, v in value.items():
key_cls = type(k)
if key_cls is str:
buffer.write_int16(NOT_NULL_STRING_FLAG)
buffer.write_string(k)
else:
if not self.ref_resolver.write_ref_or_null(buffer, k):
key_classinfo = self.class_resolver.get_classinfo(key_cls)
self.class_resolver.write_typeinfo(buffer, key_classinfo)
key_classinfo.serializer.write(buffer, k)
value_cls = type(v)
if value_cls is str:
buffer.write_int16(NOT_NULL_STRING_FLAG)
buffer.write_string(v)
elif value_cls is int:
buffer.write_int16(NOT_NULL_INT64_FLAG)
buffer.write_varint64(v)
elif value_cls is bool:
buffer.write_int16(NOT_NULL_BOOL_FLAG)
buffer.write_bool(v)
elif value_cls is float:
buffer.write_int16(NOT_NULL_FLOAT64_FLAG)
buffer.write_double(v)
else:
if not self.ref_resolver.write_ref_or_null(buffer, v):
value_classinfo = self.class_resolver. \
get_classinfo(value_cls)
self.class_resolver.write_typeinfo(buffer, value_classinfo)
value_classinfo.serializer.write(buffer, v)
cpdef inline read(self, Buffer buffer):
cdef MapRefResolver ref_resolver = self.fury.ref_resolver
cdef ClassResolver class_resolver = self.fury.class_resolver
map_ = self.type_()
ref_resolver.reference(map_)
cdef int32_t len_ = buffer.read_varuint32()
cdef int32_t ref_id
cdef ClassInfo key_classinfo
cdef ClassInfo value_classinfo
for i in range(len_):
ref_id = ref_resolver.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
key = ref_resolver.get_read_object()
else:
key_classinfo = class_resolver.read_typeinfo(buffer)
if key_classinfo.cls is str:
key = buffer.read_string()
else:
key = key_classinfo.serializer.read(buffer)
ref_resolver.set_read_object(ref_id, key)
ref_id = ref_resolver.try_preserve_ref_id(buffer)
if ref_id < NOT_NULL_VALUE_FLAG:
value = ref_resolver.get_read_object()
else:
value_classinfo = class_resolver.read_typeinfo(buffer)
cls = value_classinfo.cls
if cls is str:
value = buffer.read_string()
elif cls is int:
value = buffer.read_varint64()
elif cls is bool:
value = buffer.read_bool()
elif cls is float:
value = buffer.read_double()
else:
value = value_classinfo.serializer.read(buffer)
ref_resolver.set_read_object(ref_id, value)
map_[key] = value
return map_
@cython.final
cdef class EnumSerializer(Serializer):
@classmethod
def support_subclass(cls) -> bool:
return True
cpdef inline write(self, Buffer buffer, value):
buffer.write_string(value.name)
cpdef inline read(self, Buffer buffer):
name = buffer.read_string()
return getattr(self.type_, name)
cpdef inline xwrite(self, Buffer buffer, value):
buffer.write_varuint32(value.value)
cpdef inline xread(self, Buffer buffer):
ordinal = buffer.read_varuint32()
return self.type_(ordinal)
@cython.final
cdef class SliceSerializer(Serializer):
cpdef inline write(self, Buffer buffer, v):
cdef slice value = v
start, stop, step = value.start, value.stop, value.step
if type(start) is int:
# TODO support varint128
buffer.write_int16(NOT_NULL_INT64_FLAG)
buffer.write_varint64(start)
else:
if start is None:
buffer.write_int8(NULL_FLAG)
else:
buffer.write_int8(NOT_NULL_VALUE_FLAG)
self.fury.serialize_nonref(buffer, start)
if type(stop) is int:
# TODO support varint128
buffer.write_int16(NOT_NULL_INT64_FLAG)
buffer.write_varint64(stop)
else:
if stop is None:
buffer.write_int8(NULL_FLAG)
else:
buffer.write_int8(NOT_NULL_VALUE_FLAG)
self.fury.serialize_nonref(buffer, stop)
if type(step) is int:
# TODO support varint128
buffer.write_int16(NOT_NULL_INT64_FLAG)
buffer.write_varint64(step)
else:
if step is None:
buffer.write_int8(NULL_FLAG)
else:
buffer.write_int8(NOT_NULL_VALUE_FLAG)
self.fury.serialize_nonref(buffer, step)
cpdef inline read(self, Buffer buffer):
if buffer.read_int8() == NULL_FLAG:
start = None
else:
start = self.fury.deserialize_nonref(buffer)
if buffer.read_int8() == NULL_FLAG:
stop = None
else:
stop = self.fury.deserialize_nonref(buffer)
if buffer.read_int8() == NULL_FLAG:
step = None
else:
step = self.fury.deserialize_nonref(buffer)
return slice(start, stop, step)
cpdef xwrite(self, Buffer buffer, value):
raise NotImplementedError
cpdef xread(self, Buffer buffer):
raise NotImplementedError