core/maxframe/tensor/utils.py (480 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
import itertools
import operator
from collections import OrderedDict
from collections.abc import Iterable
from functools import wraps
from math import ceil
from numbers import Integral
from typing import Dict, List, Union
import numpy as np
from ..core import ExecutableTuple
from ..lib.mmh3 import hash_from_buffer
from ..utils import lazy_import
cp = lazy_import("cupy", rename="cp")
def normalize_shape(shape):
if isinstance(shape, Iterable):
return tuple(shape)
else:
return (shape,)
def normalize_chunk_sizes(shape, chunk_size):
shape = normalize_shape(shape)
if not isinstance(chunk_size, tuple):
if isinstance(chunk_size, Iterable):
chunk_size = tuple(chunk_size)
elif isinstance(chunk_size, int):
chunk_size = (chunk_size,) * len(shape)
if len(shape) != len(chunk_size):
raise ValueError(
"Chunks must have the same dimemsion, "
f"got shape: {shape}, chunks: {chunk_size}"
)
chunk_sizes = []
for size, chunk in zip(shape, chunk_size):
if isinstance(chunk, Iterable):
if not isinstance(chunk, tuple):
chunk = tuple(chunk)
# if chunk is (np.nan,), it means we need to concat
# all chunks together.
if chunk == (np.nan,):
chunk = (size,)
if sum(chunk) != size:
raise ValueError(
"chunks shape should be of the same length, "
f"got shape: {size}, chunks: {chunk}"
)
chunk_sizes.append(chunk)
else:
assert isinstance(chunk, int)
if size == 0:
sizes = (0,)
else:
sizes = tuple(chunk for _ in range(int(size / chunk))) + (
tuple() if size % chunk == 0 else (size % chunk,)
)
chunk_sizes.append(sizes)
return tuple(chunk_sizes)
def broadcast_shape(*shapes):
if len(shapes) == 1:
return shapes[0]
out_shapes = []
for ss in itertools.zip_longest(*[reversed(s) for s in shapes], fillvalue=-1):
shape = max(s for s in ss if s != -1)
if any(i != -1 and i != 1 and i != shape and not np.isnan(i) for i in ss):
raise ValueError(
"Operators could not be broadcast together "
"with shape {0}".format(" ".join(map(str, shapes)))
)
out_shapes.append(shape)
return tuple(reversed(out_shapes))
def get_chunk_slices(nsplits, idx):
return tuple(
slice(sum(nsplit[:idx]), sum(nsplit[: idx + 1]))
for idx, nsplit in zip(idx, nsplits)
)
def gen_random_seeds(n, random_state):
assert isinstance(random_state, np.random.RandomState)
return tuple(np.frombuffer(random_state.bytes(n * 4), dtype=np.uint32).tolist())
def validate_axis(ndim, axis, argname=None):
if axis >= ndim or axis < -ndim:
raise np.AxisError(axis, ndim=ndim, msg_prefix=argname)
return axis if axis >= 0 else ndim + axis
def normalize_axis_tuple(axis, ndim, argname=None, allow_duplicate=False):
"""
Normalizes an axis argument into a tuple of non-negative integer axes.
This handles shorthands such as ``1`` and converts them to ``(1,)``,
as well as performing the handling of negative indices covered by
`normalize_axis_index`.
By default, this forbids axes from being specified multiple times.
Used internally by multi-axis-checking logic.
Parameters
----------
axis : int, iterable of int
The un-normalized index or indices of the axis.
ndim : int
The number of dimensions of the array that `axis` should be normalized
against.
argname : str, optional
A prefix to put before the error message, typically the name of the
argument.
allow_duplicate : bool, optional
If False, the default, disallow an axis from being specified twice.
Returns
-------
normalized_axes : tuple of int
The normalized axis index, such that `0 <= normalized_axis < ndim`
Raises
------
AxisError
If any axis provided is out of range
ValueError
If an axis is repeated
See also
--------
normalize_axis_index : normalizing a single scalar axis
"""
# Optimization to speed-up the most common cases.
if type(axis) not in (tuple, list):
try:
axis = [operator.index(axis)]
except TypeError:
pass
# Going via an iterator directly is slower than via list comprehension.
axis = tuple([validate_axis(ndim, ax, argname) for ax in axis])
if not allow_duplicate and len(set(axis)) != len(axis):
if argname:
raise ValueError(f"repeated axis in `{argname}` argument")
else:
raise ValueError("repeated axis")
return axis
def validate_order(dtype, order):
if getattr(dtype, "fields", None) is None:
if order is not None:
raise ValueError("Cannot specify order when the array has no fields")
else:
return
need_check = True
if order is None:
order = list(dtype.names)
need_check = False
elif isinstance(order, (list, tuple)):
order = list(order)
else:
order = [order]
if need_check:
for o in order:
if o not in dtype.fields:
raise ValueError(f"unknown field name: {o}")
return order
def inject_dtype(dtype):
def inner(func):
@wraps(func)
def call(*tensors, **kw):
kw["dtype"] = np.dtype(dtype)
ret = func(*tensors, **kw)
if ret is NotImplemented:
reverse_func = getattr(
inspect.getmodule(func), f"r{func.__name__}", None
)
if reverse_func is not None:
ret = reverse_func(*tensors[::-1], **kw)
if ret is NotImplemented:
raise TypeError(
"unsupported operator type(s) for {0}: '{1}' and '{2}".format(
func.__name__, *[type(t) for t in tensors]
)
)
return ret
return call
return inner
def infer_dtype(np_func, multi_outputs=False, empty=True, reverse=False, check=True):
def make_arg(arg):
if empty:
return np.empty((1,) * max(1, arg.ndim), dtype=arg.dtype)
else:
if hasattr(arg, "op") and hasattr(arg.op, "data"):
arg = arg.op.data
return arg[(0,) * max(1, arg.ndim)]
tensor_ufunc = "__tensor_ufunc__"
def is_arg(arg):
if hasattr(arg, tensor_ufunc):
return False
return hasattr(arg, "ndim") and hasattr(arg, "dtype")
def inner(func):
@wraps(func)
def h(*tensors, **kw):
usr_dtype = np.dtype(kw.pop("dtype")) if "dtype" in kw else None
args = [make_arg(t) if is_arg(t) else t for t in tensors]
if reverse:
args = args[::-1]
np_kw = dict(
(k, make_arg(v) if hasattr(v, "op") else v)
for k, v in kw.items()
if is_arg(v) and k != "out"
)
dtype = None
if not any(
hasattr(arg, tensor_ufunc)
for arg in itertools.chain(args, np_kw.values())
):
# skip infer if encounter maxframe DataFrame etc
# that implements __tensor_ufunc__
try:
with np.errstate(all="ignore"):
if multi_outputs:
dtype = np_func(*args, **np_kw)[0].dtype
else:
dtype = np_func(*args, **np_kw).dtype
except: # noqa: E722
dtype = None
if usr_dtype and dtype:
can_cast_kwargs = {}
if kw.get("casting") is not None:
can_cast_kwargs["casting"] = kw.get("casting")
if check and not np.can_cast(dtype, usr_dtype, **can_cast_kwargs):
raise TypeError(
"No loop matching the specified signature "
f"and casting was found for ufunc {np_func}"
)
kw["dtype"] = usr_dtype
else:
kw["dtype"] = dtype
ret = func(*tensors, **kw)
if ret is NotImplemented:
reverse_func = (
getattr(inspect.getmodule(func), f"r{func.__name__}", None)
if not reverse
else None
)
if reverse_func is not None:
ret = reverse_func(*tensors[::-1], **kw)
if ret is NotImplemented:
raise TypeError(
"unsupported operator type(s) for {0}: '{1}' and '{2}".format(
func.__name__, *[type(t) for t in tensors]
)
)
return ret
return h
return inner
def index_ndim(index):
from .core import Tensor
if isinstance(index, Tensor) and index.dtype == np.bool_:
# boolean indexing will occupy the ndim
return index.ndim
return 1 if index is not None else 0
def replace_ellipsis(index, ndim):
all_illipsis = list(i for i, idx in enumerate(index) if idx is Ellipsis)
if len(all_illipsis) > 1:
raise IndexError("an index can only have a single ellipsis ('...')")
if not all_illipsis:
return index
illipsis_index = all_illipsis[0]
n_extra = ndim - sum([index_ndim(i) for i in index]) + 1
return (
index[:illipsis_index] + (slice(None),) * n_extra + index[illipsis_index + 1 :]
)
def calc_sliced_size(size: int, sliceobj: slice) -> int:
if np.isnan(size):
return np.nan
start, stop, step = sliceobj.indices(size)
return int(ceil(abs((stop - start) / float(step))))
def calc_object_length(obj, size=None):
if np.isscalar(obj):
return 1
elif isinstance(obj, slice):
return calc_sliced_size(size, obj)
else:
return len(obj)
def slice_split(
index: Union[int, slice], sizes: List[int]
) -> Dict[int, Union[int, slice]]:
size = sum(sizes)
if isinstance(index, Integral):
index = index if index >= 0 else size + index
i = 0
ind = index
lens = list(sizes)
while ind >= lens[0]:
i += 1
ind -= lens.pop(0)
return {i: ind}
assert isinstance(index, slice)
start, stop, step = index.indices(size)
slice_all = slice(None)
if index == slice_all:
return dict((k, slice_all) for k in range(len(sizes)))
d = dict()
if step > 0:
for i, length in enumerate(sizes):
if start < length and stop > 0:
d[i] = slice(start, min(stop, length), step)
start = (start - length) % step
else:
start = start - length
stop -= length
else:
rstart = start # running start
chunk_boundaries = np.cumsum(sizes)
for i, chunk_stop in reversed(list(enumerate(chunk_boundaries))):
# create a chunk start and stop
if i == 0:
chunk_start = 0
else:
chunk_start = chunk_boundaries[i - 1]
# if our slice is in this chunk
if (chunk_start <= rstart < chunk_stop) and (rstart > stop):
d[i] = slice(
rstart - chunk_stop,
max(chunk_start - chunk_stop - 1, stop - chunk_stop),
step,
)
# compute the next running start point,
offset = (rstart - (chunk_start - 1)) % step
rstart = chunk_start + offset - 1
# replace 0:20:1 with : if appropriate
for k, v in d.items():
if v == slice(0, sizes[k], 1):
d[k] = slice(None, None, None)
if not d: # special case x[:0]
d[0] = slice(0, 0, 1)
return d
def is_asc_sorted(arr):
arr = np.asarray(arr)
if len(arr) == 0:
return True
return np.all(arr[:-1] <= arr[1:])
def split_indexes_into_chunks(nsplits, indexes, ret_is_asc=True):
indexes = np.asarray(indexes)
chunk_idxes = np.empty_like(indexes)
cum_nsplits = [np.cumsum(nsplit) for nsplit in nsplits]
for i, cum_nsplit, index in zip(itertools.count(0), cum_nsplits, indexes):
# handle negative value in index
if hasattr(index, "flags") and not index.flags.writeable:
index = index.copy()
index = np.add(index, cum_nsplit[-1], out=index, where=index < 0)
sorted_idx = np.argsort(index)
if np.any(index >= cum_nsplit[-1]):
idx = index[index >= cum_nsplit[-1]][0]
raise IndexError(f"index {idx} is out of bounds with size {cum_nsplit[-1]}")
chunk_idx = np.searchsorted(cum_nsplit, index[sorted_idx], side="right")
chunk_idxes[i, sorted_idx] = chunk_idx
chunk_idxes_asc = False
if ret_is_asc:
chunk_idxes_asc = is_asc_sorted(np.lexsort(chunk_idxes[::-1]))
chunk_index_to_indexes = OrderedDict()
chunk_index_to_poses = OrderedDict()
poses = np.arange(len(indexes[0]))
for idx in itertools.product(*(range(len(nsplit)) for nsplit in nsplits)):
cond = (chunk_idxes == np.array(idx).reshape((len(idx), 1))).all(axis=0)
filtered = indexes[:, cond]
for i in range(len(indexes)):
filtered[i] = filtered[i] - (
cum_nsplits[i][idx[i] - 1] if idx[i] > 0 else 0
)
chunk_index_to_indexes[idx] = filtered
chunk_index_to_poses[idx] = poses[cond]
if ret_is_asc:
return chunk_index_to_indexes, chunk_index_to_poses, chunk_idxes_asc
return chunk_index_to_indexes, chunk_index_to_poses
def calc_pos(fancy_index_shape, pos, xp=np):
if isinstance(pos, dict):
pos = xp.concatenate(list(pos.values()))
select_pos = xp.empty(fancy_index_shape, dtype=int)
select_pos.flat[pos] = xp.arange(select_pos.size)
return select_pos
def decide_unify_split(*splits):
# TODO (jisheng): In the future, we need more sophisticated way to decide the rechunk split
# right now, for (2, 2) and (3, 1), we get the rechunk split as (2, 1, 1)
if not splits:
return ()
raw_splits = splits
# support broadcasting rules
# decide_unify_splits((1,), (5,)) --> (5,)
splits = set(s for s in splits if ((len(s) > 1) or (len(s) == 1 and s[0] != 1)))
if len(splits) == 1:
return splits.pop()
if len(splits) == 0:
return raw_splits[0]
if any(np.isnan(sum(s)) for s in splits):
raise ValueError(f"Tensor chunk sizes are unknown: {splits}")
if len(set(sum(s) for s in splits)) > 1:
raise ValueError(f"Splits not of same size: {splits}")
q = [list(s) for s in splits]
size = sum(q[0])
cum = 0
res = []
while cum < size:
m = min(s[0] for s in q)
res.append(m)
for s in q:
s[0] -= m
if s[0] == 0:
s.pop(0)
cum += m
return tuple(res)
def check_out_param(out, t, casting):
from .misc import broadcast_to
if not hasattr(out, "shape"):
raise TypeError("return arrays must be a tensor")
try:
broadcast_to(t, out.shape)
except ValueError:
raise ValueError(
"operators could not be broadcast together "
"with shapes ({0}) ({1})".format(
",".join(str(s) for s in t.shape), ",".join(str(s) for s in out.shape)
)
)
if not np.can_cast(t.dtype, out.dtype, casting):
raise TypeError(
f"output (typecode '{t.dtype.char}') could not be coerced "
f"to provided output parameter (typecode '{out.dtype.char}') "
f"according to the casting rule ''{casting}''"
)
def check_random_state(seed):
"""
Turn seed into a mt.random.RandomState instance
:param seed:
If seed is None, return the RandomState singleton used by mt.random.
If seed is an int, return a new RandomState instance seeded with seed.
If seed is already a RandomState instance, return it.
Otherwise raise ValueError.
:return:
"""
from numpy import random as np_mtrand
from . import random as mtrand
if seed is None or seed is mtrand or seed is np_mtrand:
return mtrand._random_state
if isinstance(seed, (Integral, np.integer)):
return mtrand.RandomState(seed)
if isinstance(seed, np.random.RandomState):
return mtrand.RandomState.from_numpy(seed)
if isinstance(seed, mtrand.RandomState):
return seed
raise ValueError(f"{seed} cannot be used to seed a mt.random.RandomState instance")
def filter_inputs(inputs):
from ..core import ENTITY_TYPE
return [inp for inp in inputs if isinstance(inp, ENTITY_TYPE)]
# this function is only used for pandas' compatibility
def to_numpy(pdf):
try:
return pdf.to_numpy()
except AttributeError: # pragma: no cover
return pdf.values
def check_order(order_str, available_options="KACF", err_msg="order not understood"):
order_str = order_str.upper()
if order_str not in available_options:
raise TypeError(err_msg)
def get_order(
order_str, to_keep_order, available_options="KACF", err_msg="order not understood"
):
from .core import TensorOrder
check_order(order_str, available_options=available_options, err_msg=err_msg)
if order_str in "KA":
return to_keep_order
elif order_str == "C":
return TensorOrder.C_ORDER
else:
return TensorOrder.F_ORDER
def reverse_order(old_order):
from .core import TensorOrder
assert isinstance(old_order, TensorOrder)
return (
TensorOrder.C_ORDER if old_order == TensorOrder.F_ORDER else TensorOrder.F_ORDER
)
def hash_on_axis(ar, axis, n_dest):
ar = np.asarray(ar)
# cannot be scalar
assert ar.ndim > 0
axis = validate_axis(ar.ndim, axis)
if n_dest == 1:
return np.zeros(ar.shape[axis], dtype=np.uint32)
if ar.ndim > 2:
ret = np.empty(ar.shape[axis], dtype=np.uint32)
def _hash_to_dest(data):
i = data[0]
idx = (slice(None),) * axis + (i,)
ret[i] = hash_from_buffer(memoryview(ar[idx])) % n_dest
np.apply_along_axis(_hash_to_dest, 0, np.arange(ar.shape[axis])[np.newaxis, :])
return ret
else:
def _hash_to_dest(data):
return hash_from_buffer(memoryview(data)) % n_dest
if ar.ndim == 1:
ar = ar.reshape(ar.size, 1)
return np.apply_along_axis(_hash_to_dest, 1 - axis, ar)
def fetch_corner_data(tensor, session=None):
print_option = np.get_printoptions()
# only fetch corner data when data > threshold
threshold = print_option["threshold"]
# number of edge items to print
edgeitems = print_option["edgeitems"]
# we fetch corner data based on the fact that
# the tensor must have been executed,
# thus the size could not be NaN
if tensor.size > threshold:
# two edges for each exis
indices_iter = list(itertools.product(*(range(2) for _ in range(tensor.ndim))))
corners = np.empty(shape=(2,) * tensor.ndim, dtype=object)
shape = [0 for _ in range(tensor.ndim)]
for indices in indices_iter:
slc = []
for ax, i in enumerate(indices):
size = tensor.shape[ax]
if size > edgeitems * 2 + 2:
# fetch two more elements
if i == 0:
slc.append(slice(edgeitems + 1))
else:
slc.append(slice(-edgeitems - 1, None))
shape[ax] += edgeitems + 1
else:
i_sep = size // 2
if i == 0:
slc.append(slice(i_sep))
shape[ax] += i_sep
else:
slc.append(slice(i_sep, None))
shape[ax] += size - i_sep
corners[indices] = tensor[tuple(slc)]
# fetch together
fetched = ExecutableTuple(corners.flat).fetch(session=session)
for indices, f in zip(indices_iter, fetched):
corners[indices] = f
return np.block(corners.tolist())
else:
return tensor.fetch(session=session)
def implement_scipy(scipy_fun):
import re
import textwrap
def wrapper(fun):
if scipy_fun is None:
return None
if not fun.__doc__:
doc_str = textwrap.dedent(scipy_fun.__doc__)
lines = []
for line in doc_str.splitlines(keepends=False):
# skip function headers
if line.startswith(scipy_fun.__name__ + "("):
continue
# skip version marks
if line.strip().startswith(".. versionadded::"):
continue
# skip examples
if line.strip() == "Examples":
break
lines.append(line)
doc_str = "\n".join(lines).strip()
# remove trailing empty sections
fun.__doc__ = re.sub(r"[A-Za-z]+\n-+$", "", doc_str).strip()
return fun
return wrapper