core/maxframe/_utils.pyx (408 lines of code) (raw):

# distutils: language = c++ # Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import collections import importlib import inspect import itertools import os import pickle import pkgutil import time import types import uuid import warnings from datetime import date, datetime, timedelta, tzinfo from enum import Enum from functools import lru_cache, partial from random import getrandbits from weakref import WeakSet import cloudpickle import numpy as np import pandas as pd cimport cython from cpython cimport PyBytes_FromStringAndSize from libc.stdint cimport uint8_t, uint32_t, uint_fast64_t from libc.stdlib cimport free, malloc from .lib.cython.libcpp cimport mt19937_64 try: from pandas.tseries.offsets import Tick as PDTick except ImportError: PDTick = None from .lib.mmh3 import hash as mmh_hash from .lib.mmh3 import hash_bytes as mmh_hash_bytes from .lib.mmh3 import hash_from_buffer as mmh3_hash_from_buffer cdef bint _has_cupy = bool(pkgutil.find_loader('cupy')) cdef bint _has_cudf = bool(pkgutil.find_loader('cudf')) cdef bint _has_sqlalchemy = bool(pkgutil.find_loader('sqlalchemy')) cdef bint _has_interval_array_inclusive = hasattr( pd.arrays.IntervalArray, "inclusive" ) cdef extern from "MurmurHash3.h": void MurmurHash3_x64_128(const void * key, Py_ssize_t len, uint32_t seed, void * out) cdef bytes _get_maxframe_key(const uint8_t[:] bufferview): cdef const uint8_t *data = &bufferview[0] cdef uint8_t out[16] MurmurHash3_x64_128(data, len(bufferview), 0, out) out[0] |= 0xC0 return PyBytes_FromStringAndSize(<char*>out, 16) cpdef str to_str(s, encoding='utf-8'): if type(s) is str: return <str>s elif isinstance(s, bytes): return (<bytes>s).decode(encoding) elif isinstance(s, str): return str(s) elif s is None: return s else: raise TypeError(f"Could not convert from {s} to str.") cpdef bytes to_binary(s, encoding='utf-8'): if type(s) is bytes: return <bytes>s elif isinstance(s, unicode): return (<unicode>s).encode(encoding) elif isinstance(s, bytes): return bytes(s) elif s is None: return None else: raise TypeError(f"Could not convert from {s} to bytes.") cpdef unicode to_text(s, encoding='utf-8'): if type(s) is unicode: return <unicode>s elif isinstance(s, bytes): return (<bytes>s).decode('utf-8') elif isinstance(s, unicode): return unicode(s) elif s is None: return None else: raise TypeError(f"Could not convert from {s} to unicode.") _type_dispatchers = WeakSet() NamedType = collections.namedtuple("NamedType", ["name", "type_"]) cdef class TypeDispatcher: def __init__(self): self._handlers = dict() self._lazy_handlers = dict() # store inherited handlers to facilitate unregistering self._inherit_handlers = dict() _type_dispatchers.add(self) cpdef void register(self, object type_, object handler): if isinstance(type_, str): self._lazy_handlers[type_] = handler elif type(type_) is not NamedType and isinstance(type_, tuple): for t in type_: self.register(t, handler) else: self._handlers[type_] = handler cpdef void unregister(self, object type_): if type(type_) is not NamedType and isinstance(type_, tuple): for t in type_: self.unregister(t) else: self._lazy_handlers.pop(type_, None) self._handlers.pop(type_, None) self._inherit_handlers.clear() def dump_handlers(self): return ( self._handlers.copy(), self._lazy_handlers.copy(), self._inherit_handlers.copy(), ) def load_handlers(self, handlers, lazy_handlers, inherit_handlers): self._handlers = handlers self._lazy_handlers = lazy_handlers self._inherit_handlers = inherit_handlers cdef _reload_lazy_handlers(self): for k, v in self._lazy_handlers.items(): mod_name, obj_name = k.rsplit('.', 1) with warnings.catch_warnings(): # the lazy imported cudf will warn no device found, # when we set visible device to -1 for CPU processes, # ignore the warning to not distract users warnings.simplefilter("ignore") mod = importlib.import_module(mod_name, __name__) self.register(getattr(mod, obj_name), v) self._lazy_handlers = dict() cpdef get_handler(self, object type_): try: return self._handlers[type_] except KeyError: pass try: return self._inherit_handlers[type_] except KeyError: self._reload_lazy_handlers() if type(type_) is NamedType: named_type = partial(NamedType, type_.name) mro = itertools.chain( *zip(map(named_type, type_.type_.__mro__), type_.type_.__mro__) ) else: mro = type_.__mro__ for clz in mro: # only lookup self._handlers for mro clz handler = self._handlers.get(clz) if handler is not None: self._inherit_handlers[type_] = handler return handler raise KeyError(f'Cannot dispatch type {type_}') def __call__(self, object obj, *args, **kwargs): return self.get_handler(type(obj))(obj, *args, **kwargs) @staticmethod def reload_all_lazy_handlers(): for dispatcher in _type_dispatchers: (<TypeDispatcher>dispatcher)._reload_lazy_handlers() cdef inline build_canonical_bytes(tuple args, kwargs): if kwargs: args = args + (kwargs,) return pickle.dumps(tokenize_handler(args)) def tokenize(*args, **kwargs): return _get_maxframe_key(build_canonical_bytes(args, kwargs)).hex() def tokenize_int(*args, **kwargs): return mmh_hash(build_canonical_bytes(args, kwargs)) cdef class Tokenizer(TypeDispatcher): def __call__(self, object obj, *args, **kwargs): try: return self.get_handler(type(obj))(obj, *args, **kwargs) except KeyError: if hasattr(obj, '__maxframe_tokenize__') and not isinstance(obj, type): if len(args) == 0 and len(kwargs) == 0: return obj.__maxframe_tokenize__() else: obj = obj.__maxframe_tokenize__() return self.get_handler(type(obj))(obj, *args, **kwargs) if callable(obj): if PDTick is not None and not isinstance(obj, PDTick): return tokenize_function(obj) try: return cloudpickle.dumps(obj) except: raise TypeError(f'Cannot generate token for {obj}, type: {type(obj)}') from None cdef inline list iterative_tokenize(object ob): cdef list dq = [ob] cdef int dq_pos = 0 cdef list h_list = [] while dq_pos < len(dq): x = dq[dq_pos] dq_pos += 1 if type(x) in _primitive_types: h_list.append(x) elif isinstance(x, (list, tuple)): dq.extend(x) elif isinstance(x, set): dq.extend(sorted(x)) elif isinstance(x, dict): dq.extend(sorted(x.items())) else: h_list.append(tokenize_handler(x)) if dq_pos >= 64 and len(dq) < dq_pos * 2: # pragma: no cover dq = dq[dq_pos:] dq_pos = 0 return h_list cdef inline tuple tokenize_numpy(ob): cdef int offset if not ob.shape: return str(ob), ob.dtype if hasattr(ob, 'mode') and getattr(ob, 'filename', None): if hasattr(ob.base, 'ctypes'): offset = (ob.ctypes.get_as_parameter().value - ob.base.ctypes.get_as_parameter().value) else: offset = 0 # root memmap's have mmap object as misc return (ob.filename, os.path.getmtime(ob.filename), ob.dtype, ob.shape, ob.strides, offset) if ob.dtype.hasobject: try: data = mmh_hash_bytes('-'.join(ob.flat).encode('utf-8', errors='surrogatepass')) except UnicodeDecodeError: data = mmh_hash_bytes(b'-'.join([to_binary(x) for x in ob.flat])) except TypeError: try: data = mmh_hash_bytes(pickle.dumps(ob, pickle.HIGHEST_PROTOCOL)) except: # nothing can do, generate uuid data = uuid.uuid4().hex else: try: data = mmh_hash_bytes(ob.ravel().view('i1').data) except (BufferError, AttributeError, ValueError): data = mmh_hash_bytes(ob.copy().ravel().view('i1').data) return data, ob.dtype, ob.shape, ob.strides cdef inline _extract_range_index_attr(object range_index, str attr): try: return getattr(range_index, attr) except AttributeError: # pragma: no cover return getattr(range_index, '_' + attr) cdef list tokenize_pandas_index(ob): cdef long long start cdef long long stop cdef long long end if isinstance(ob, pd.RangeIndex): start = _extract_range_index_attr(ob, 'start') stop = _extract_range_index_attr(ob, 'stop') step = _extract_range_index_attr(ob, 'step') # for range index, there is no need to get the values return iterative_tokenize([ob.name, getattr(ob, 'names', None), slice(start, stop, step)]) else: return iterative_tokenize([ob.name, getattr(ob, 'names', None), ob.values]) cdef list tokenize_pandas_series(ob): return iterative_tokenize([ob.name, ob.dtype, ob.values, ob.index]) cdef list tokenize_pandas_dataframe(ob): l = [block.values for block in ob._data.blocks] l.extend([ob.columns, ob.index]) return iterative_tokenize(l) cdef list tokenize_pandas_categorical(ob): l = ob.to_list() l.append(ob.shape) return iterative_tokenize(l) cdef list tokenize_pd_extension_dtype(ob): return iterative_tokenize([ob.name]) cdef list tokenize_categories_dtype(ob): return iterative_tokenize([ob.categories, ob.ordered]) cdef list tokenize_interval_dtype(ob): return iterative_tokenize([type(ob).__name__, ob.subtype]) cdef list tokenize_pandas_time_arrays(ob): return iterative_tokenize([ob.asi8, ob.dtype]) cdef list tokenize_pandas_tick(ob): return iterative_tokenize([ob.freqstr]) cdef list tokenize_pandas_interval_arrays(ob): # pragma: no cover if _has_interval_array_inclusive: return iterative_tokenize([ob.left, ob.right, ob.inclusive]) else: return iterative_tokenize([ob.left, ob.right, ob.closed]) cdef list tokenize_sqlalchemy_data_type(ob): return iterative_tokenize([repr(ob)]) cdef list tokenize_sqlalchemy_selectable(ob): return iterative_tokenize([str(ob)]) cdef list tokenize_enum(ob): cls = type(ob) return iterative_tokenize([id(cls), cls.__name__, ob.name]) @lru_cache(500) def tokenize_function(ob): if isinstance(ob, partial): args = iterative_tokenize(ob.args) keywords = iterative_tokenize(ob.keywords.items()) if ob.keywords else None return tokenize_function(ob.func), args, keywords else: try: if isinstance(ob, types.FunctionType): return iterative_tokenize([pickle.dumps(ob, protocol=0), id(ob)]) else: return pickle.dumps(ob, protocol=0) except: pass try: return cloudpickle.dumps(ob, protocol=0) except: return str(ob) @lru_cache(500) def tokenize_pickled_with_cache(ob): return pickle.dumps(ob) def tokenize_cupy(ob): from .serialization import serialize header, _buffers = serialize(ob) return iterative_tokenize([header, ob.data.ptr]) def tokenize_cudf(ob): from .serialization import serialize header, buffers = serialize(ob) return iterative_tokenize([header] + [(buf.ptr, buf.size) for buf in buffers]) cdef Tokenizer tokenize_handler = Tokenizer() cdef set _primitive_types = { int, float, str, unicode, bytes, complex, type(None), type, slice, date, datetime, timedelta } for t in _primitive_types: tokenize_handler.register(t, lambda ob: ob) for t in (np.dtype, np.generic): tokenize_handler.register(t, lambda ob: ob) for t in (list, tuple, dict, set): tokenize_handler.register(t, iterative_tokenize) tokenize_handler.register(np.ndarray, tokenize_numpy) tokenize_handler.register(np.random.RandomState, lambda ob: iterative_tokenize(ob.get_state())) tokenize_handler.register(memoryview, lambda ob: mmh3_hash_from_buffer(ob)) tokenize_handler.register(Enum, tokenize_enum) tokenize_handler.register(pd.Index, tokenize_pandas_index) tokenize_handler.register(pd.Series, tokenize_pandas_series) tokenize_handler.register(pd.DataFrame, tokenize_pandas_dataframe) tokenize_handler.register(pd.Categorical, tokenize_pandas_categorical) tokenize_handler.register(pd.CategoricalDtype, tokenize_categories_dtype) tokenize_handler.register(pd.IntervalDtype, tokenize_interval_dtype) tokenize_handler.register(tzinfo, tokenize_pickled_with_cache) tokenize_handler.register(pd.arrays.DatetimeArray, tokenize_pandas_time_arrays) tokenize_handler.register(pd.arrays.TimedeltaArray, tokenize_pandas_time_arrays) tokenize_handler.register(pd.arrays.PeriodArray, tokenize_pandas_time_arrays) tokenize_handler.register(pd.arrays.IntervalArray, tokenize_pandas_interval_arrays) tokenize_handler.register(pd.api.extensions.ExtensionDtype, tokenize_pd_extension_dtype) if _has_cupy: tokenize_handler.register('cupy.ndarray', tokenize_cupy) if _has_cudf: tokenize_handler.register('cudf.DataFrame', tokenize_cudf) tokenize_handler.register('cudf.Series', tokenize_cudf) tokenize_handler.register('cudf.Index', tokenize_cudf) if PDTick is not None: tokenize_handler.register(PDTick, tokenize_pandas_tick) if _has_sqlalchemy: tokenize_handler.register( "sqlalchemy.sql.sqltypes.TypeEngine", tokenize_sqlalchemy_data_type ) tokenize_handler.register( "sqlalchemy.sql.Selectable", tokenize_sqlalchemy_selectable ) cpdef register_tokenizer(cls, handler): tokenize_handler.register(cls, handler) @cython.nonecheck(False) @cython.cdivision(True) cpdef long long ceildiv(long long x, long long y) nogil: return x // y + (x % y != 0) cdef class Timer: cdef object _start cdef readonly object duration def __enter__(self): self._start = time.time() return self def __exit__(self, *_): self.duration = time.time() - self._start cdef mt19937_64 _rnd_gen cdef bint _rnd_is_seed_set = False cpdef void reset_id_random_seed() except *: cdef bytes seed_bytes global _rnd_is_seed_set seed_bytes = getrandbits(64).to_bytes(8, "little") _rnd_gen.seed((<uint_fast64_t *><char *>seed_bytes)[0]) _rnd_is_seed_set = True cpdef bytes new_random_id(int byte_len): cdef uint_fast64_t *res_ptr cdef uint_fast64_t res_data[4] cdef int i, qw_num = byte_len >> 3 cdef bytes res if not _rnd_is_seed_set: reset_id_random_seed() if (qw_num << 3) < byte_len: qw_num += 1 if qw_num <= 4: # use stack memory to accelerate res_ptr = res_data else: res_ptr = <uint_fast64_t *>malloc(qw_num << 3) try: for i in range(qw_num): res_ptr[i] = _rnd_gen() return <bytes>((<char *>&(res_ptr[0]))[:byte_len]) finally: # free memory if allocated by malloc if res_ptr != res_data: free(res_ptr) cdef str _package_root = os.path.dirname(__file__) def get_user_call_point(): cdef str filename cdef object cur_frame = inspect.currentframe() while cur_frame is not None: filename = cur_frame.f_code.co_filename if not filename.startswith(_package_root): break cur_frame = cur_frame.f_back return cur_frame __all__ = [ 'ceildiv', 'get_user_call_point', 'new_random_id', 'register_tokenizer', 'reset_id_random_seed', 'to_str', 'to_binary', 'to_text', 'tokenize', 'tokenize_int', 'Timer', 'TypeDispatcher', ]