core/maxframe/serialization/core.pyx (744 lines of code) (raw):

# distutils: language = c++ # Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import asyncio import datetime import hashlib import importlib import re from collections import OrderedDict from functools import partial, wraps from typing import Any, Dict, List, Optional, Union import numpy as np import pandas as pd from cpython cimport PyObject from libc.stdint cimport int32_t, int64_t, uint32_t, uint64_t, uintptr_t from libcpp.unordered_map cimport unordered_map from pandas.api.extensions import ExtensionDtype from pandas.api.types import pandas_dtype from .._utils import NamedType from .._utils cimport TypeDispatcher from ..lib import wrapped_pickle as pickle from ..lib.dtypes_extension import ArrowDtype from ..utils import NoDefault, arrow_type_from_str, no_default # resolve pandas pickle compatibility between <1.2 and >=1.3 try: from pandas.core.internals import blocks as pd_blocks if not hasattr(pd_blocks, "new_block") and hasattr(pd_blocks, "make_block"): # register missing func that would cause errors pd_blocks.new_block = pd_blocks.make_block except (ImportError, AttributeError): pass try: import pytz from pytz import BaseTzInfo as PyTZ_BaseTzInfo except ImportError: PyTZ_BaseTzInfo = type(None) try: import zoneinfo from zoneinfo import ZoneInfo except ImportError: ZoneInfo = type(None) BUFFER_PICKLE_PROTOCOL = max(pickle.DEFAULT_PROTOCOL, 5) cdef bint HAS_PICKLE_BUFFER = pickle.HIGHEST_PROTOCOL >= 5 cdef bint _PANDAS_HAS_MGR = hasattr(pd.Series([0]), "_mgr") cdef bint _ARROW_DTYPE_NOT_SUPPORTED = ArrowDtype is None cdef TypeDispatcher _serial_dispatcher = TypeDispatcher() cdef dict _deserializers = dict() cdef uint32_t _MAX_STR_PRIMITIVE_LEN = 1024 # prime modulus for serializer ids # use the largest prime number smaller than 32767 cdef int32_t _SERIALIZER_ID_PRIME = 32749 # ids for basic serializers cdef: int PICKLE_SERIALIZER = 0 int PRIMITIVE_SERIALIZER = 1 int BYTES_SERIALIZER = 2 int STR_SERIALIZER = 3 int TUPLE_SERIALIZER = 4 int LIST_SERIALIZER = 5 int DICT_SERIALIZER = 6 int PY_DATETIME_SERIALIZER = 7 int PY_DATE_SERIALIZER = 8 int PY_TIMEDELTA_SERIALIZER = 9 int PY_TZINFO_SERIALIZER = 10 int DTYPE_SERIALIZER = 11 int COMPLEX_SERIALIZER = 12 int SLICE_SERIALIZER = 13 int REGEX_SERIALIZER = 14 int NO_DEFAULT_SERIALIZER = 15 int PLACEHOLDER_SERIALIZER = 4096 cdef dict _type_cache = dict() cpdef object load_type(str class_name, object parent_class): if class_name in _type_cache: cls = _type_cache[class_name] else: try: from .deserializer import safe_load_type cls = safe_load_type(class_name, parent_class) except ImportError: if pickle.is_unpickle_forbidden(): raise mod_name, cls_name = class_name.rsplit("#", 1) try: cls = importlib.import_module(mod_name) except ImportError as ex: raise ImportError( f"Failed to import {mod_name} when loading " f"class {class_name}, {ex}" ) from None for sub_cls_name in cls_name.split("."): cls = getattr(cls, sub_cls_name) _type_cache[class_name] = cls if not issubclass(cls, parent_class): raise ValueError(f"Class {class_name} not a {parent_class}") return cls cpdef void clear_type_cache(): _type_cache.clear() cdef Serializer get_deserializer(int32_t deserializer_id): return _deserializers[deserializer_id] cdef class Serializer: serializer_id = None _public_data_context_key = 0x7fffffff - 1 def __cinit__(self): # make the value can be referenced with C code self._serializer_id = self.serializer_id cpdef bint is_public_data_exist(self, dict context, object key): cdef dict public_dict = context.get(self._public_data_context_key, None) if public_dict is None: return False return key in public_dict cpdef put_public_data(self, dict context, object key, object value): cdef dict public_dict = context.get(self._public_data_context_key, None) if public_dict is None: public_dict = context[self._public_data_context_key] = {} public_dict[key] = value cpdef get_public_data(self, dict context, object key): cdef dict public_dict = context.get(self._public_data_context_key, None) if public_dict is None: return None return public_dict.get(key) cpdef serial(self, object obj, dict context): """ Returns intermediate serialization result of certain object. The returned value can be a Placeholder or a tuple comprising of three parts: a header, a group of subcomponents and a finalizing flag. * Header is a pickle-serializable tuple * Subcomponents are parts or buffers for iterative serialization. * Flag is a boolean value. If true, subcomponents should be buffers (for instance, bytes, memory views, GPU buffers, etc.) that can be read and written directly. If false, subcomponents will be serialized iteratively. Parameters ---------- obj: Any Object to serialize context: Dict Serialization context to help creating Placeholder objects for reducing duplicated serialization Returns ------- result: Placeholder | Tuple[Tuple, List, bool] Intermediate result of serialization """ raise NotImplementedError cpdef deserial(self, list serialized, dict context, list subs): """ Returns deserialized object given serialized headers and deserialized subcomponents. Parameters ---------- serialized: List Serialized object header as a tuple context Serialization context for instantiation of Placeholder objects subs: List Deserialized subcomponents Returns ------- result: Any Deserialized objects """ raise NotImplementedError cpdef on_deserial_error( self, list serialized, dict context, list subs_serialized, int error_index, object exc, ): """ Returns rewritten exception when subcomponent deserialization fails Parameters ---------- serialized: List Serialized object header as a tuple context Serialization context for instantiation of Placeholder objects subs_serialized: List Serialized subcomponents error_index: int Index of subcomponent causing error exc: BaseException Exception raised Returns ------- exc: BaseException | None Rewritten exception. If None, original exception is kept. """ return None @classmethod def calc_default_serializer_id(cls): s = f"{cls.__module__}.{cls.__qualname__}" h = hashlib.md5(s.encode()) return int(h.hexdigest(), 16) % _SERIALIZER_ID_PRIME @classmethod def register(cls, obj_type, name=None): if ( cls.serializer_id is None or cls.serializer_id == getattr(super(cls, cls), "serializer_id", None) ): # a class should have its own serializer_id # inherited serializer_id not acceptable cls.serializer_id = cls.calc_default_serializer_id() inst = cls() if name is not None: obj_type = NamedType(name, obj_type) _serial_dispatcher.register(obj_type, inst) if _deserializers.get(cls.serializer_id) is not None: assert type(_deserializers[cls.serializer_id]) is cls else: _deserializers[cls.serializer_id] = inst @classmethod def unregister(cls, obj_type, name=None): if name is not None: obj_type = NamedType(name, obj_type) _serial_dispatcher.unregister(obj_type) _deserializers.pop(cls.serializer_id, None) @classmethod def dump_handlers(cls): return _serial_dispatcher.dump_handlers() @classmethod def load_handlers(cls, *args): _serial_dispatcher.load_handlers(*args) cdef inline uint64_t _fast_id(PyObject * obj) nogil: return <uintptr_t>obj def fast_id(obj): """C version of id() used for serialization""" return _fast_id(<PyObject *>obj) def buffered(func): """ Wrapper for serial() method to reduce duplicated serialization """ @wraps(func) def wrapped(self, obj: Any, dict context): cdef uint64_t obj_id = _fast_id(<PyObject*>obj) if obj_id in context: return Placeholder(_fast_id(<PyObject*>obj)) else: context[obj_id] = obj return func(self, obj, context) return wrapped def pickle_buffers(obj): cdef list buffers = [None] if HAS_PICKLE_BUFFER: def buffer_cb(x): x = x.raw() if x.ndim > 1: # ravel n-d memoryview x = x.cast(x.format) buffers.append(memoryview(x)) buffers[0] = pickle.dumps( obj, buffer_callback=buffer_cb, protocol=BUFFER_PICKLE_PROTOCOL, ) else: # pragma: no cover buffers[0] = pickle.dumps(obj) return buffers def unpickle_buffers(buffers): result = pickle.loads(buffers[0], buffers=buffers[1:]) # as pandas prior to 1.1.0 use _data instead of _mgr to hold BlockManager, # deserializing from high versions may produce mal-functioned pandas objects, # thus the patch is needed if _PANDAS_HAS_MGR: return result else: # pragma: no cover if hasattr(result, "_mgr") and isinstance(result, (pd.DataFrame, pd.Series)): result._data = getattr(result, "_mgr") delattr(result, "_mgr") return result cdef class PickleContainer: cdef: list buffers def __init__(self, list buffers): self.buffers = buffers cpdef get(self): return unpickle_buffers(self.buffers) cpdef list get_buffers(self): return self.buffers cdef class PickleSerializer(Serializer): serializer_id = PICKLE_SERIALIZER cpdef serial(self, obj: Any, dict context): cdef uint64_t obj_id obj_id = _fast_id(<PyObject*>obj) if obj_id in context: return Placeholder(obj_id) context[obj_id] = obj if type(obj) is PickleContainer: return [], (<PickleContainer>obj).get_buffers(), True return [], pickle_buffers(obj), True cpdef deserial(self, list serialized, dict context, list subs): from .deserializer import deserial_pickle return deserial_pickle(serialized, context, subs) cdef set _primitive_types = { type(None), bool, int, float, } cdef class PrimitiveSerializer(Serializer): serializer_id = PRIMITIVE_SERIALIZER cpdef serial(self, object obj, dict context): return [obj,], [], True cpdef deserial(self, list obj, dict context, list subs): return obj[0] cdef class BytesSerializer(Serializer): serializer_id = BYTES_SERIALIZER cpdef serial(self, obj: Any, dict context): cdef uint64_t obj_id obj_id = _fast_id(<PyObject*>obj) if obj_id in context: return Placeholder(obj_id) context[obj_id] = obj return [], [obj], True cpdef deserial(self, list serialized, dict context, list subs): return subs[0] cdef class StrSerializer(Serializer): serializer_id = STR_SERIALIZER cpdef serial(self, obj: Any, dict context): cdef uint64_t obj_id obj_id = _fast_id(<PyObject*>obj) if obj_id in context: return Placeholder(obj_id) context[obj_id] = obj return [], [(<str>obj).encode()], True cpdef deserial(self, list serialized, dict context, list subs): buffer = subs[0] if type(buffer) is memoryview: buffer = buffer.tobytes() return buffer.decode() cdef class CollectionSerializer(Serializer): obj_type = None cdef object _obj_type def __cinit__(self): # make the value can be referenced with C code self._obj_type = self.obj_type cdef tuple _serial_iterable(self, obj: Any): cdef list idx_to_propagate = [] cdef list obj_to_propagate = [] cdef list obj_list = <list>obj if type(obj) is list else list(obj) cdef int64_t idx cdef object item for idx in range(len(obj_list)): item = obj_list[idx] if type(item) is bytes and len(<bytes>item) < _MAX_STR_PRIMITIVE_LEN: # treat short strings as primitives continue elif type(item) is str and len(<str>item) < _MAX_STR_PRIMITIVE_LEN: # treat short strings as primitives continue elif type(item) in _primitive_types: continue if obj is obj_list: obj_list = list(obj) obj_list[idx] = None idx_to_propagate.append(idx) obj_to_propagate.append(item) return [obj_list, idx_to_propagate], obj_to_propagate, False cpdef serial(self, obj: Any, dict context): cdef uint64_t obj_id obj_id = _fast_id(<PyObject*>obj) if obj_id in context: return Placeholder(obj_id) context[obj_id] = obj return self._serial_iterable(obj) cdef list _deserial_iterable(self, list serialized, list subs): cdef list res_list, idx_to_propagate cdef int64_t i res_list, idx_to_propagate = serialized for i in range(len(idx_to_propagate)): res_list[idx_to_propagate[i]] = subs[i] return res_list cdef class TupleSerializer(CollectionSerializer): serializer_id = TUPLE_SERIALIZER obj_type = tuple cpdef serial(self, obj: Any, dict context): cdef uint64_t obj_id cdef list header cdef object data, is_leaf obj_id = _fast_id(<PyObject*>obj) if obj_id in context: return Placeholder(obj_id) context[obj_id] = obj header, data, is_leaf = self._serial_iterable(obj) if hasattr(type(obj), "_fields"): header.append(type(obj).__module__ + "#" + type(obj).__qualname__) else: header.append(None) return header, data, is_leaf cpdef deserial(self, list serialized, dict context, list subs): cdef list res cdef str tuple_type_name = serialized[-1] res = self._deserial_iterable(serialized[:-1], subs) for v in res: assert type(v) is not Placeholder if tuple_type_name is None: return tuple(res) else: tuple_type = load_type(tuple_type_name, tuple) return tuple_type(*res) cdef class ListSerializer(CollectionSerializer): serializer_id = LIST_SERIALIZER obj_type = list cpdef deserial(self, list serialized, dict context, list subs): cdef int64_t idx cdef list res = self._deserial_iterable(serialized, subs) result = list(res) for idx, v in enumerate(res): if type(v) is Placeholder: cb = partial(result.__setitem__, idx) (<Placeholder>v).callbacks.append(cb) return result def _dict_key_replacer(ret, key, real_key): ret[real_key] = ret.pop(key) def _dict_value_replacer(context, ret, key, real_value): if type(key) is Placeholder: key = context[(<Placeholder>key).id] ret[key] = real_value cdef: object _TYPE_CHAR_ORDERED_DICT = "O" cdef class DictSerializer(CollectionSerializer): serializer_id = DICT_SERIALIZER cpdef serial(self, obj: Any, dict context): cdef uint64_t obj_id cdef list key_obj, value_obj cdef list key_bufs, value_bufs if type(obj) is dict and len(<dict>obj) == 0: return [], [], True obj_id = _fast_id(<PyObject*>obj) if obj_id in context: return Placeholder(obj_id) context[obj_id] = obj if isinstance(obj, OrderedDict): ser_type = _TYPE_CHAR_ORDERED_DICT else: ser_type = None key_obj, key_bufs, _ = self._serial_iterable(obj.keys()) value_obj, value_bufs, _ = self._serial_iterable(obj.values()) ser_obj = [key_obj, value_obj, len(key_bufs), ser_type] return ser_obj, key_bufs + value_bufs, False cpdef deserial(self, list serialized, dict context, list subs): cdef int64_t i, num_key_bufs cdef list key_subs, value_subs, keys, values if not serialized: return {} if len(serialized) == 1: # serialized directly return serialized[0] key_serialized, value_serialized, num_key_bufs, ser_type = serialized key_subs = subs[:num_key_bufs] value_subs = subs[num_key_bufs:] keys = self._deserial_iterable(<list>key_serialized, key_subs) values = self._deserial_iterable(<list>value_serialized, value_subs) if ser_type == _TYPE_CHAR_ORDERED_DICT: ret = OrderedDict(zip(keys, values)) else: ret = dict(zip(keys, values)) for i in range(len(keys)): k, v = keys[i], values[i] if type(k) is Placeholder: (<Placeholder>k).callbacks.append( partial(_dict_key_replacer, ret, k) ) if type(v) is Placeholder: (<Placeholder>v).callbacks.append( partial(_dict_value_replacer, context, ret, k) ) return ret cdef class PyDatetimeSerializer(Serializer): serializer_id = PY_DATETIME_SERIALIZER cpdef serial(self, obj: datetime.datetime, dict context): cdef list ser_tz = ( _serial_tz(obj.tzinfo) if obj.tzinfo is not None else None ) return [obj.timestamp(), ser_tz], [], True cpdef deserial(self, list serialized, dict context, list subs): cdef object tz = ( _deserialize_tz(serialized[1]) if serialized[1] is not None else None ) return datetime.datetime.fromtimestamp(serialized[0], tz) cdef class PyDateSerializer(Serializer): serializer_id = PY_DATE_SERIALIZER cpdef serial(self, obj: datetime.date, dict context): return [obj.toordinal()], [], True cpdef deserial(self, list serialized, dict context, list subs): return datetime.date.fromordinal(serialized[0]) cdef class PyTimedeltaSerializer(Serializer): serializer_id = PY_TIMEDELTA_SERIALIZER cpdef serial(self, obj: datetime.timedelta, dict context): return [obj.days, obj.seconds, obj.microseconds], [], True cpdef deserial(self, list serialized, dict context, list subs): return datetime.timedelta( days=serialized[0], seconds=serialized[1], microseconds=serialized[2], ) cdef: object _TYPE_CHAR_TZ_BASE = "S" object _TYPE_CHAR_TZ_ZONEINFO = "ZI" object _TYPE_CHAR_TZ_PYTZ = "PT" cdef inline list _serial_tz( obj: datetime.tzinfo, dt: Optional[datetime.datetime] = None ): cdef object type_char if isinstance(obj, PyTZ_BaseTzInfo): return [_TYPE_CHAR_TZ_PYTZ, obj.zone] elif isinstance(obj, ZoneInfo): return [_TYPE_CHAR_TZ_ZONEINFO, obj.key] else: dt = dt or datetime.datetime.now() return [ _TYPE_CHAR_TZ_BASE, obj.tzname(dt), int(obj.utcoffset(dt).total_seconds()), ] cdef inline object _deserialize_tz(list serialized): if serialized[0] == _TYPE_CHAR_TZ_PYTZ: return pytz.timezone(serialized[1]) elif serialized[0] == _TYPE_CHAR_TZ_ZONEINFO: return zoneinfo.ZoneInfo(serialized[1]) else: if serialized[2] == 0: return datetime.timezone.utc return datetime.timezone( datetime.timedelta(seconds=serialized[2]), name=serialized[1] ) cdef class TZInfoSerializer(Serializer): serializer_id = PY_TZINFO_SERIALIZER cpdef serial(self, object obj: datetime.tzinfo, dict context): return _serial_tz(obj), [], True cpdef deserial(self, list serialized, dict context, list subs): return _deserialize_tz(serialized) cdef: object _TYPE_CHAR_DTYPE_NUMPY = "N" object _TYPE_CHAR_DTYPE_PANDAS_ARROW = "PA" object _TYPE_CHAR_DTYPE_PANDAS_CATEGORICAL = "PC" object _TYPE_CHAR_DTYPE_PANDAS_INTERVAL = "PI" object _TYPE_CHAR_DTYPE_PANDAS_EXTENSION = "PE" cdef class DtypeSerializer(Serializer): serializer_id = DTYPE_SERIALIZER @staticmethod def _sort_fields(list fields): return sorted(fields, key=lambda k: fields[k][1]) cpdef serial(self, obj: Union[np.dtype, ExtensionDtype], dict context): if isinstance(obj, np.dtype): try: return [ _TYPE_CHAR_DTYPE_NUMPY, np.lib.format.dtype_to_descr(obj), None ], [], True except ValueError: fields = obj.fields new_fields = self._sort_fields(fields) desc = np.lib.format.dtype_to_descr(obj[new_fields]) dtype_new_order = list(fields) return [_TYPE_CHAR_DTYPE_NUMPY, desc, dtype_new_order], [], True elif isinstance(obj, ExtensionDtype): if _ARROW_DTYPE_NOT_SUPPORTED: raise ImportError("ArrowDtype is not supported in current environment") if isinstance(obj, ArrowDtype): return [_TYPE_CHAR_DTYPE_PANDAS_ARROW, str(obj.pyarrow_dtype)], [], True elif isinstance(obj, pd.CategoricalDtype): return [ _TYPE_CHAR_DTYPE_PANDAS_CATEGORICAL, obj.ordered ], [obj.categories], False elif isinstance(obj, pd.IntervalDtype): return [ _TYPE_CHAR_DTYPE_PANDAS_INTERVAL, obj.closed ], [obj.subdtype], False else: return [_TYPE_CHAR_DTYPE_PANDAS_EXTENSION, repr(obj)], [], True else: raise NotImplementedError(f"Does not support serializing dtype {obj!r}") cpdef deserial(self, list serialized, dict context, list subs): cdef str ser_type = serialized[0] if ser_type == _TYPE_CHAR_DTYPE_NUMPY: try: dt = np.lib.format.descr_to_dtype(serialized[1]) except AttributeError: dt = np.dtype(serialized[1]) if serialized[2] is not None: # fill dtype_new_order field dt = dt[serialized[2]] return dt elif ser_type == _TYPE_CHAR_DTYPE_PANDAS_ARROW: if _ARROW_DTYPE_NOT_SUPPORTED: raise ImportError("ArrowDtype is not supported in current environment") return ArrowDtype(arrow_type_from_str(serialized[1])) elif ser_type == _TYPE_CHAR_DTYPE_PANDAS_CATEGORICAL: return pd.CategoricalDtype(subs[0], serialized[1]) elif ser_type == _TYPE_CHAR_DTYPE_PANDAS_INTERVAL: return pd.IntervalDtype(subs[0], serialized[1]) elif ser_type == _TYPE_CHAR_DTYPE_PANDAS_EXTENSION: if serialized[1] == "StringDtype": # for legacy pandas version return pd.StringDtype() return pandas_dtype(serialized[1]) else: raise NotImplementedError(f"Unknown serialization type {ser_type}") cdef class ComplexSerializer(Serializer): serializer_id = COMPLEX_SERIALIZER cpdef serial(self, object obj: complex, dict context): cdef complex cplx = <complex>obj return [cplx.real, cplx.imag], [], True cpdef deserial(self, list serialized, dict context, list subs): return complex(*serialized[:2]) cdef class SliceSerializer(Serializer): serializer_id = SLICE_SERIALIZER cpdef serial(self, object obj: slice, dict context): return [obj.start, obj.stop, obj.step], [], True cpdef deserial(self, list serialized, dict context, list subs): return slice(*serialized[:3]) cdef class RegexSerializer(Serializer): serializer_id = REGEX_SERIALIZER cpdef serial(self, object obj: re.Pattern, dict context): cdef uint64_t obj_id obj_id = _fast_id(<PyObject*>obj) if obj_id in context: return Placeholder(obj_id) context[obj_id] = obj return [obj.flags], [(<str>(obj.pattern)).encode()], True cpdef deserial(self, list serialized, dict context, list subs): return re.compile((<bytes>(subs[0])).decode(), serialized[0]) cdef class NoDefaultSerializer(Serializer): serializer_id = NO_DEFAULT_SERIALIZER cpdef serial(self, object obj, dict context): return [], [], True cpdef deserial(self, list obj, dict context, list subs): return no_default cdef class Placeholder: """ Placeholder object to reduce duplicated serialization The object records object identifier and keeps callbacks to replace itself in parent objects. """ def __init__(self, uint64_t id_): self.id = id_ self.callbacks = [] def __hash__(self): return self.id def __eq__(self, other): # pragma: no cover if type(other) is not Placeholder: return False return self.id == other.id def __repr__(self): return ( f"Placeholder(id={self.id}, " f"callbacks=[list of {len(self.callbacks)}])" ) cdef class PlaceholderSerializer(Serializer): serializer_id = PLACEHOLDER_SERIALIZER cpdef serial(self, obj: Any, dict context): return [], [], True cpdef deserial(self, list serialized, dict context, list subs): return Placeholder(0) PickleSerializer.register(object) for _primitive in _primitive_types: PrimitiveSerializer.register(_primitive) BytesSerializer.register(bytes) BytesSerializer.register(memoryview) StrSerializer.register(str) ListSerializer.register(list) TupleSerializer.register(tuple) DictSerializer.register(dict) PyDatetimeSerializer.register(datetime.datetime) PyDateSerializer.register(datetime.date) PyTimedeltaSerializer.register(datetime.timedelta) TZInfoSerializer.register(datetime.tzinfo) DtypeSerializer.register(np.dtype) DtypeSerializer.register(ExtensionDtype) ComplexSerializer.register(complex) SliceSerializer.register(slice) RegexSerializer.register(re.Pattern) NoDefaultSerializer.register(NoDefault) PlaceholderSerializer.register(Placeholder) cdef class _SerialStackItem: cdef public list serialized cdef public list subs cdef public list subs_serialized def __cinit__(self, list serialized, list subs): self.serialized = serialized self.subs = subs self.subs_serialized = [] cdef class _IdContextHolder: cdef public unordered_map[uint64_t, uint64_t] d cdef public uint64_t obj_count def __cinit__(self): self.obj_count = 0 cdef tuple _serial_single( obj, dict context, _IdContextHolder id_context_holder ): """Serialize single object and return serialized tuples""" cdef uint64_t obj_id, ordered_id cdef Serializer serializer cdef int serializer_id cdef list common_header, serialized, subs while True: name = context.get("serializer") obj_type = type(obj) if name is None else NamedType(name, type(obj)) serializer = _serial_dispatcher.get_handler(obj_type) serializer_id = serializer._serializer_id ret_serial = serializer.serial(obj, context) if type(ret_serial) is tuple: # object is serialized, form a common header and return serialized, subs, final = <tuple>ret_serial if type(obj) is Placeholder: obj_id = (<Placeholder>obj).id ordered_id = id_context_holder.d[obj_id] else: ordered_id = id_context_holder.obj_count id_context_holder.obj_count += 1 # only need to record object ids for non-primitive types if serializer_id != PRIMITIVE_SERIALIZER: obj_id = _fast_id(<PyObject*>obj) id_context_holder.d[obj_id] = ordered_id # REMEMBER to change _COMMON_HEADER_LEN when content of # this header changed common_header = [ serializer_id, ordered_id, len(subs), final ] break else: # object is converted into another (usually a Placeholder) obj = ret_serial common_header.extend(serialized) return common_header, subs, final class _SerializeObjectOverflow(Exception): def __init__(self, list cur_serialized, int num_total_serialized): super(_SerializeObjectOverflow, self).__init__(cur_serialized) self.cur_serialized = cur_serialized self.num_total_serialized = num_total_serialized cpdef object _serialize_with_stack( list serial_stack, list serialized, dict context, _IdContextHolder id_context_holder, list result_bufs_list, int64_t num_overflow = 0, int64_t num_total_serialized = 0, ): cdef _SerialStackItem stack_item cdef list subs cdef bint final cdef int64_t num_sub_serialized cdef bint is_resume = num_total_serialized > 0 while serial_stack: stack_item = serial_stack[-1] if serialized is not None: # have previously-serialized results, record first stack_item.subs_serialized.append(serialized) num_sub_serialized = len(stack_item.subs_serialized) if len(stack_item.subs) == num_sub_serialized: # all subcomponents serialized, serialization of current is done # and we can move to the parent object serialized = stack_item.serialized + stack_item.subs_serialized num_total_serialized += 1 serial_stack.pop() else: # serialize next subcomponent at stack top serialized, subs, final = _serial_single( stack_item.subs[num_sub_serialized], context, id_context_holder ) num_total_serialized += 1 if final or not subs: # the subcomponent is a leaf if subs: result_bufs_list.extend(subs) else: # the subcomponent has its own subcomponents, we push itself # into stack and process its children stack_item = _SerialStackItem(serialized, subs) serial_stack.append(stack_item) # note that the serialized header should not be recorded # as we are now processing the subcomponent itself serialized = None if 0 < num_overflow < num_total_serialized: raise _SerializeObjectOverflow(serialized, num_total_serialized) # we keep an empty dict for extra metas required for other modules if is_resume: # returns num of deserialized objects when resumed extra_meta = {"_N": num_total_serialized} else: # otherwise does not record the number to reduce result size extra_meta = {} return [extra_meta, serialized], result_bufs_list def serialize(obj, dict context = None): """ Serialize an object and return a header and buffers. Buffers are intended for zero-copy data manipulation. Parameters ---------- obj: Any Object to serialize context: Serialization context for instantiation of Placeholder objects Returns ------- result: Tuple[Tuple, List] Picklable header and buffers """ cdef list serial_stack = [] cdef list result_bufs_list = [] cdef list serialized cdef list subs cdef bint final cdef _IdContextHolder id_context_holder = _IdContextHolder() cdef tuple result context = context if context is not None else dict() serialized, subs, final = _serial_single(obj, context, id_context_holder) if final or not subs: # marked as a leaf node, return directly result = [{}, serialized], subs else: serial_stack.append(_SerialStackItem(serialized, subs)) result = _serialize_with_stack( serial_stack, None, context, id_context_holder, result_bufs_list ) result[0][0]["_PUB"] = context.get(Serializer._public_data_context_key) return result async def serialize_with_spawn( obj, dict context = None, int spawn_threshold = 100, object executor = None ): """ Serialize an object and return a header and buffers. Buffers are intended for zero-copy data manipulation. Parameters ---------- obj: Any Object to serialize context: Dict Serialization context for instantiation of Placeholder objects spawn_threshold: int Threshold to spawn into a ThreadPoolExecutor executor: ThreadPoolExecutor ThreadPoolExecutor to spawn rest serialization into Returns ------- result: Tuple[Tuple, List] Picklable header and buffers """ cdef list serial_stack = [] cdef list result_bufs_list = [] cdef list serialized cdef list subs cdef bint final cdef _IdContextHolder id_context_holder = _IdContextHolder() cdef tuple result context = context if context is not None else dict() serialized, subs, final = _serial_single(obj, context, id_context_holder) if final or not subs: # marked as a leaf node, return directly result = [{}, serialized], subs else: serial_stack.append(_SerialStackItem(serialized, subs)) try: result = _serialize_with_stack( serial_stack, None, context, id_context_holder, result_bufs_list, spawn_threshold, ) except _SerializeObjectOverflow as ex: result = await asyncio.get_running_loop().run_in_executor( executor, _serialize_with_stack, serial_stack, ex.cur_serialized, context, id_context_holder, result_bufs_list, 0, ex.num_total_serialized, ) result[0][0]["_PUB"] = context.get(Serializer._public_data_context_key) return result cdef object deserialize_impl def deserialize(list serialized, list buffers, dict context = None): """ Deserialize an object with serialized headers and buffers Parameters ---------- serialized: List Serialized object header buffers: List List of buffers extracted from serialize() calls context: Dict Serialization context for replacing Placeholder objects Returns ------- result: Any Deserialized object """ global deserialize_impl if deserialize_impl is None: from .deserializer import deserialize as deserialize_impl return deserialize_impl(serialized, buffers, context)