core/maxframe/serialization/numpy.py (71 lines of code) (raw):
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Dict, List
import numpy as np
from .core import Serializer, buffered
_TYPE_CHAR_NP_GENERIC = "G"
class NDArraySerializer(Serializer):
@buffered
def serial(self, obj: np.generic, context: Dict):
order = "C"
if obj.flags.f_contiguous:
order = "F"
elif not obj.flags.c_contiguous:
obj = np.ascontiguousarray(obj)
try:
desc = np.lib.format.dtype_to_descr(obj.dtype)
dtype_new_order = None
except ValueError:
# for structured dtype, array[[field2, field1]] will create a view,
# and dtype_to_desc will fail due to the order
fields = obj.dtype.fields
new_fields = sorted(fields, key=lambda k: fields[k][1])
desc = np.lib.format.dtype_to_descr(obj.dtype[new_fields])
dtype_new_order = list(fields)
type_char = _TYPE_CHAR_NP_GENERIC if not isinstance(obj, np.ndarray) else None
header = dict(
type=type_char,
descr=desc,
dtype_new_order=dtype_new_order,
shape=list(obj.shape),
strides=list(obj.strides),
order=order,
)
flattened = obj.ravel(order=order)
if obj.dtype.hasobject:
is_leaf = False
data = flattened.tolist()
else:
is_leaf = True
data = memoryview(flattened.view("uint8").data)
return [header], [data], is_leaf
def deserial(self, serialized: List, context: Dict, subs: List[Any]):
header = serialized[0]
try:
dtype = np.lib.format.descr_to_dtype(header["descr"])
except AttributeError: # pragma: no cover
# for older numpy versions, descr_to_dtype is not implemented
dtype = np.dtype(header["descr"])
dtype_new_order = header["dtype_new_order"]
if dtype_new_order:
dtype = dtype[dtype_new_order]
if dtype.hasobject:
shape = tuple(header["shape"])
if shape == ():
val = np.array(subs[0]).reshape(shape)
else:
# fill empty object array
val = np.empty(shape, dtype=dtype)
try:
val[(slice(None),) * len(shape)] = subs[0]
except ValueError:
val[(slice(None),) * len(shape)] = np.array(
subs[0], dtype=dtype
).reshape(shape)
else:
val = np.ndarray(
shape=tuple(header["shape"]),
dtype=dtype,
buffer=subs[0],
strides=tuple(header["strides"]),
order=header["order"],
)
if header.get("type") == _TYPE_CHAR_NP_GENERIC:
return np.take(val, 0)
return val
NDArraySerializer.register(np.generic)
NDArraySerializer.register(np.ndarray)