core/maxframe/tensor/random/core.py (154 lines of code) (raw):

# Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from collections.abc import Iterable from contextlib import contextmanager import numpy as np from ...serialization.serializables import FieldTypes, Int32Field, TupleField from ..core import TENSOR_TYPE from ..datasource import tensor as astensor from ..misc import broadcast_to from ..operators import TensorMapReduceOperator, TensorOperator, TensorOperatorMixin from ..utils import broadcast_shape class RandomState: def __init__(self, seed=None): self._random_state = np.random.RandomState(seed=seed) def seed(self, seed=None): """ Seed the generator. This method is called when `RandomState` is initialized. It can be called again to re-seed the generator. For details, see `RandomState`. Parameters ---------- seed : int or 1-d array_like, optional Seed for `RandomState`. Must be convertible to 32 bit unsigned integers. See Also -------- RandomState """ self._random_state.seed(seed=seed) def to_numpy(self): return self._random_state @classmethod def from_numpy(cls, np_random_state): state = RandomState() state._random_state = np_random_state return state @classmethod def _handle_size(cls, size): if size is None: return size try: return tuple(int(s) for s in size) except TypeError: return (size,) _random_state = RandomState() def handle_array(arg): if not isinstance(arg, TENSOR_TYPE): if not isinstance(arg, Iterable): return arg arg = np.asarray(arg) return arg[(0,) * max(1, arg.ndim)] elif hasattr(arg, "op") and hasattr(arg.op, "data"): return arg.op.data[(0,) * max(1, arg.ndim)] return np.empty((0,), dtype=arg.dtype) class TensorRandomOperatorMixin(TensorOperatorMixin): __slots__ = () def _calc_shape(self, shapes): shapes = list(shapes) if getattr(self, "size", None) is not None: shapes.append(getattr(self, "size")) return broadcast_shape(*shapes) @classmethod def _handle_arg(cls, arg, chunk_size): if isinstance(arg, (list, np.ndarray)): arg = astensor(arg, chunk_size=chunk_size) return arg @contextmanager def _get_inputs_shape_by_given_fields( self, inputs, shape, raw_chunk_size=None, tensor=True ): fields = getattr(self, "_input_fields_", []) to_one_chunk_fields = set(getattr(self, "_into_one_chunk_fields_", list())) field_to_obj = dict() to_broadcast_shapes = [] if fields: if getattr(self, fields[0], None) is None: # create from beginning for field, val in zip(fields, inputs): if field not in to_one_chunk_fields: if isinstance(val, list): val = np.asarray(val) if tensor: val = self._handle_arg(val, raw_chunk_size) if isinstance(val, TENSOR_TYPE): field_to_obj[field] = val if field not in to_one_chunk_fields: to_broadcast_shapes.append(val.shape) setattr(self, field, val) else: inputs_iter = iter(inputs) for field in fields: if isinstance(getattr(self, field), TENSOR_TYPE): field_to_obj[field] = next(inputs_iter) if tensor: if shape is None: shape = self._calc_shape(to_broadcast_shapes) for field, inp in field_to_obj.items(): if field not in to_one_chunk_fields: field_to_obj[field] = broadcast_to(inp, shape) yield [field_to_obj[f] for f in fields if f in field_to_obj], shape inputs_iter = iter(getattr(self, "_inputs")) for field in fields: if field in field_to_obj: setattr(self, field, next(inputs_iter)) @classmethod def _get_shape(cls, kws, kw): if kw.get("shape") is not None: return kw.get("shape") elif kws is not None and len(kws) > 0: return kws[0].get("shape") def _new_tileables(self, inputs, kws=None, **kw): raw_chunk_size = kw.get("chunk_size", None) shape = self._get_shape(kws, kw) with self._get_inputs_shape_by_given_fields( inputs, shape, raw_chunk_size, True ) as (inputs, shape): kw["shape"] = shape return super()._new_tileables(inputs, kws=kws, **kw) def _on_serialize_random_state(rs): return rs.get_state() if rs is not None else None def _on_deserialize_random_state(tup): if tup is None: return None rs = np.random.RandomState() rs.set_state(tup) return rs def RandomStateField(name, **kwargs): kwargs.update( dict( on_serialize=_on_serialize_random_state, on_deserialize=_on_deserialize_random_state, ) ) return TupleField(name, **kwargs) class TensorSeedOperatorMixin(object): @property def seed(self): return getattr(self, "seed", None) @property def args(self): if hasattr(self, "_fields_"): return self._fields_ else: return [ field for field in self._FIELDS if field not in TensorRandomOperator._FIELDS ] class TensorRandomOperator(TensorSeedOperatorMixin, TensorOperator): seed = Int32Field("seed") def __init__(self, dtype=None, **kw): dtype = np.dtype(dtype) if dtype is not None else dtype if "state" in kw: kw["_state"] = kw.pop("state") super().__init__(dtype=dtype, **kw) class TensorRandomMapReduceOperator(TensorSeedOperatorMixin, TensorMapReduceOperator): seed = Int32Field("seed") def __init__(self, dtype=None, **kw): dtype = np.dtype(dtype) if dtype is not None else dtype if "state" in kw: kw["_state"] = kw.pop("state") super().__init__(dtype=dtype, **kw) class TensorDistribution(TensorRandomOperator): size = TupleField("size", FieldTypes.int64) class TensorSimpleRandomData(TensorRandomOperator): size = TupleField("size", FieldTypes.int64) def __init__(self, size=None, **kw): if type(size) is int: size = (size,) super().__init__(size=size, **kw)