core/maxframe/serialization/pandas.py (191 lines of code) (raw):

# Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import datetime import enum from typing import Any, Dict, List import pandas as pd from pandas.api.extensions import ExtensionArray from pandas.arrays import IntervalArray from ..utils import no_default from .core import Serializer, buffered class DataFrameSerializer(Serializer): @buffered def serial(self, obj: pd.DataFrame, context: Dict): col_data = [] for _, col in obj.items(): if getattr(col.dtype, "hasobject", False): col_data.append(col.tolist()) else: col_data.append(col.values) return [], [obj.dtypes, obj.index] + col_data, False def deserial( self, serialized: List, context: Dict, subs: List[Any] ) -> pd.DataFrame: dtypes, idx = subs[:2] df = pd.concat([pd.Series(d, index=idx) for d in subs[2:]], axis=1) df.columns = dtypes.index df.index = idx return df.astype(dtypes) class SeriesSerializer(Serializer): @buffered def serial(self, obj: pd.Series, context: Dict): if getattr(obj.dtype, "hasobject", False): data = obj.tolist() else: data = obj.values return [], [obj.index, obj.name, obj.dtype, data], False def deserial(self, serialized: List, context: Dict, subs: List[Any]) -> pd.Series: idx, name, dtype, data = subs return pd.Series(data, index=idx, name=name).astype(dtype) _TYPE_CHAR_MULTI_INDEX = "M" _TYPE_CHAR_RANGE_INDEX = "R" _TYPE_CHAR_CATEGORICAL_INDEX = "C" class IndexSerializer(Serializer): @buffered def serial(self, obj: pd.Index, context: Dict): if isinstance(obj, pd.MultiIndex): data = [obj.get_level_values(idx) for idx in range(obj.nlevels)] header = [_TYPE_CHAR_MULTI_INDEX] elif isinstance(obj, pd.RangeIndex): data = [obj.name, obj.dtype] header = [_TYPE_CHAR_RANGE_INDEX, obj.start, obj.stop, obj.step] elif isinstance(obj, pd.CategoricalIndex): data = [obj.name, obj.values] header = [_TYPE_CHAR_CATEGORICAL_INDEX] else: if getattr(obj.dtype, "hasobject", False): values = obj.tolist() else: values = obj.values data = [obj.dtype, obj.name, values] header = [None] return header, data, len(data) == 0 def deserial(self, serialized: List, context: Dict, subs: List[Any]) -> pd.Index: header = serialized if header[0] == _TYPE_CHAR_MULTI_INDEX: return pd.MultiIndex.from_arrays(subs) elif header[0] == _TYPE_CHAR_RANGE_INDEX: name, dtype = subs[:2] start, stop, step = header[1:] return pd.RangeIndex(start, stop, step, dtype=dtype, name=name) elif header[0] == _TYPE_CHAR_CATEGORICAL_INDEX: name, data = subs[:2] return pd.CategoricalIndex(data, name=name) elif header[0] is None: # Normal index dtype, name, values = subs return pd.Index(values, dtype=dtype, name=name) else: # pragma: no cover raise NotImplementedError( f"Deserialization for index header label {header[0]} not implemented" ) class CategoricalSerializer(Serializer): @buffered def serial(self, obj: pd.Categorical, context: Dict): return [obj.ordered], [obj.codes, obj.dtype, obj.categories], False def deserial( self, serialized: List, context: Dict, subs: List[Any] ) -> pd.Categorical: codes, dtype, categories = subs ordered = serialized[0] return pd.Categorical.from_codes(codes, categories, ordered=ordered) _TYPE_CHAR_INTERVAL_ARRAY = "I" class ArraySerializer(Serializer): @buffered def serial(self, obj: ExtensionArray, context: Dict): ser_type = None dtype = obj.dtype if isinstance(obj.dtype, pd.IntervalDtype): ser_type = _TYPE_CHAR_INTERVAL_ARRAY data_parts = [obj.left, obj.right] elif isinstance(obj.dtype, pd.StringDtype): if hasattr(obj, "tolist"): data_parts = [obj.tolist()] else: data_parts = [obj.to_numpy().tolist()] elif hasattr(obj, "_data"): data_parts = [getattr(obj, "_data")] else: data_parts = [getattr(obj, "_pa_array")] return [ser_type], [dtype] + data_parts, False def deserial(self, serialized: List, context: Dict, subs: List): if serialized[0] == _TYPE_CHAR_INTERVAL_ARRAY: dtype, left, right = subs return IntervalArray.from_arrays(left, right, dtype=dtype) else: dtype, data = subs return pd.array(data, dtype) class PdTimestampSerializer(Serializer): def serial(self, obj: pd.Timestamp, context: Dict): if obj.tz: zone_info = [obj.tz] ts = obj.timestamp() else: zone_info = [] ts = obj.to_pydatetime().timestamp() elements = [int(ts), obj.microsecond, obj.nanosecond] if hasattr(obj, "unit"): elements.append(str(obj.unit)) return elements, zone_info, bool(zone_info) def deserial(self, serialized: List, context: Dict, subs: List): if subs: pydt = datetime.datetime.utcfromtimestamp(serialized[0]) kwargs = { "year": pydt.year, "month": pydt.month, "day": pydt.day, "hour": pydt.hour, "minute": pydt.minute, "second": pydt.second, "microsecond": serialized[1], "nanosecond": serialized[2], "tzinfo": datetime.timezone.utc, } if len(serialized) > 3: kwargs["unit"] = serialized[3] val = pd.Timestamp(**kwargs).tz_convert(subs[0]) else: pydt = datetime.datetime.fromtimestamp(serialized[0]) kwargs = { "year": pydt.year, "month": pydt.month, "day": pydt.day, "hour": pydt.hour, "minute": pydt.minute, "second": pydt.second, "microsecond": serialized[1], "nanosecond": serialized[2], } if len(serialized) >= 4: kwargs["unit"] = serialized[3] val = pd.Timestamp(**kwargs) return val class PdTimedeltaSerializer(Serializer): def serial(self, obj: pd.Timedelta, context: Dict): elements = [int(obj.seconds), obj.microseconds, obj.nanoseconds, obj.days] if hasattr(obj, "unit"): elements.append(str(obj.unit)) return elements, [], True def deserial(self, serialized: List, context: Dict, subs: List): days = 0 if len(serialized) < 4 else serialized[3] unit = None if len(serialized) < 5 else serialized[4] seconds, microseconds, nanoseconds = serialized[:3] kwargs = { "days": days, "seconds": seconds, "microseconds": microseconds, "nanoseconds": nanoseconds, } if unit is not None: kwargs["unit"] = unit return pd.Timedelta(**kwargs) class NoDefaultSerializer(Serializer): def serial(self, obj: enum.Enum, context: Dict): return [], [], True def deserial(self, serialized: List, context: Dict, subs: List): return no_default DataFrameSerializer.register(pd.DataFrame) SeriesSerializer.register(pd.Series) IndexSerializer.register(pd.Index) CategoricalSerializer.register(pd.Categorical) ArraySerializer.register(ExtensionArray) PdTimestampSerializer.register(pd.Timestamp) PdTimedeltaSerializer.register(pd.Timedelta) NoDefaultSerializer.register(type(no_default))