core/maxframe/tensor/arithmetic/core.py (414 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
from ...core import ExecutableTuple
from ...serialization.serializables import (
AnyField,
DictField,
FieldTypes,
KeyField,
ListField,
StringField,
)
from ..core import Tensor, TensorOrder
from ..datasource import tensor as astensor
from ..operators import TensorOperator, TensorOperatorMixin
from ..utils import broadcast_shape, check_order, check_out_param, filter_inputs
class TensorElementWise(TensorOperatorMixin):
__slots__ = ()
class TensorElementWiseWithInputs(TensorElementWise):
def _set_sparse(self, inputs):
raise NotImplementedError
def _new_tileables(self, inputs, kws=None, **kw):
self._set_sparse(inputs)
return super()._new_tileables(inputs, kws=kws, **kw)
def _handle_out_dtype(val, dtype):
if val.dtype != dtype:
return val.astype(dtype)
return val
class TensorBinOpMixin(TensorElementWiseWithInputs):
__slots__ = ()
def check_inputs(self, inputs):
if len(inputs) > 4:
raise ValueError(
f"Binary operator's inputs should less than or equal 4, got {len(inputs)}"
)
@classmethod
def _get_func(cls, xp):
func_name = getattr(cls, "_func_name")
return getattr(xp, func_name)
class TensorBinOp(TensorOperator, TensorBinOpMixin):
lhs = AnyField("lhs", default=None)
rhs = AnyField("rhs", default=None)
out = KeyField("out", default=None)
where = KeyField("where", default=None)
casting = StringField("casting", default=None)
order = StringField("order", default=None)
err = DictField("err", FieldTypes.string, FieldTypes.string, default_factory=dict)
def __init__(self, **kwargs):
super().__init__(**kwargs)
if self.order is None:
self.order = "K"
check_order(self.order)
@classmethod
def _is_sparse(cls, x1, x2):
return False
def _set_sparse(self, inputs):
inputs_iter = iter(inputs)
x1 = self.lhs if np.isscalar(self.lhs) else next(inputs_iter)
x2 = self.rhs if np.isscalar(self.rhs) else next(inputs_iter)
setattr(self, "sparse", self._is_sparse(x1, x2))
def _set_inputs(self, inputs):
super()._set_inputs(inputs)
inputs_iter = iter(self._inputs)
self.lhs = self.lhs if np.isscalar(self.lhs) else next(inputs_iter)
self.rhs = self.rhs if np.isscalar(self.rhs) else next(inputs_iter)
if getattr(self, "out", None) is not None:
self.out = next(inputs_iter)
if getattr(self, "where", None) is not None:
self.where = next(inputs_iter)
def _process_inputs(self, x1, x2, out, where):
x1 = x1 if np.isscalar(x1) else astensor(x1)
x2 = x2 if np.isscalar(x2) else astensor(x2)
self.lhs = x1
self.rhs = x2
if out is not None:
if isinstance(out, Tensor):
self.out = out
else:
raise TypeError(f"out should be Tensor object, got {type(out)} instead")
if where is True:
where = None
if where is not None:
where = astensor(where)
self.where = where
return x1, x2, out, where
def _calc_order(self, x1, x2, out):
if out is not None:
return out.order
if self.order in "KA":
orders = []
if not np.isscalar(x1):
orders.append(x1.order)
if not np.isscalar(x2):
orders.append(x2.order)
if len(orders) == 0:
return TensorOrder.C_ORDER
elif any(order == TensorOrder.C_ORDER for order in orders):
return TensorOrder.C_ORDER
else:
return TensorOrder.F_ORDER
elif self.order == "C":
return TensorOrder.C_ORDER
else:
return TensorOrder.F_ORDER
@property
def ufunc_extra_params(self):
return dict()
def _call_tensor_ufunc(self, x1, x2, out=None, where=None):
if hasattr(x1, "__tensor_ufunc__") or hasattr(x2, "__tensor_ufunc__"):
ufunc = (
x1.__tensor_ufunc__
if hasattr(x1, "__tensor_ufunc__")
else x2.__tensor_ufunc__
)
ret = ufunc(type(self), [x1, x2], out, where, **self.ufunc_extra_params)
if ret is NotImplemented:
return
return ret
def _call(self, x1, x2, out=None, where=None):
# check tensor ufunc, if x1 or x2 is not a tensor, e.g. MaxFrame DataFrame
# which implements tensor ufunc, will delegate the computation
# to it if possible
ret = self._call_tensor_ufunc(x1, x2, out=out, where=where)
if ret is not None:
return ret
x1, x2, out, where = self._process_inputs(x1, x2, out, where)
# check broadcast
x1_shape = () if np.isscalar(x1) else x1.shape
x2_shape = () if np.isscalar(x2) else x2.shape
shape = broadcast_shape(x1_shape, x2_shape)
order = self._calc_order(x1, x2, out)
inputs = filter_inputs([x1, x2, out, where])
t = self.new_tensor(inputs, shape, order=order)
if out is None:
return t
check_out_param(out, t, self.casting)
out_shape, out_dtype = out.shape, out.dtype
# if `out` is specified, use out's dtype and shape
if t.shape != out_shape:
t = self.new_tensor(inputs, out_shape, order=order)
setattr(self, "dtype", out_dtype)
out.data = t.data
return out
def __call__(self, x1, x2, out=None, where=None):
return self._call(x1, x2, out=out, where=where)
def rcall(self, x1, x2, out=None, where=None):
return self._call(x2, x1, out=out, where=where)
class TensorUnaryOpMixin(TensorElementWiseWithInputs):
__slots__ = ()
def check_inputs(self, inputs):
if len(inputs) > 3:
raise ValueError(
f"Binary operator's inputs should less than or equal 3, got {len(inputs)}"
)
@classmethod
def _get_func(cls, xp):
func_name = getattr(cls, "_func_name")
return getattr(xp, func_name)
class TensorUnaryOp(TensorOperator, TensorUnaryOpMixin):
_input = KeyField("input")
out = KeyField("out", default=None)
where = KeyField("where", default=None)
casting = StringField("casting", default=None)
order = StringField("order", default=None)
err = DictField("err", FieldTypes.string, FieldTypes.string, default_factory=dict)
def __init__(self, **kwargs):
super().__init__(**kwargs)
if self.order is None:
self.order = "K"
check_order(self.order)
@property
def input(self):
return self._input
@classmethod
def _is_sparse(cls, x):
if hasattr(x, "issparse") and x.issparse():
return True
else:
return False
def _set_inputs(self, inputs):
super()._set_inputs(inputs)
inputs_iter = iter(self._inputs)
self._input = next(inputs_iter)
if getattr(self, "out", None) is not None:
self.out = next(inputs_iter)
if getattr(self, "where", None) is not None:
self.where = next(inputs_iter)
def _process_inputs(self, x, out, where):
x = astensor(x)
if out is not None:
if isinstance(out, Tensor):
self.out = out
else:
raise TypeError(f"out should be Tensor object, got {type(out)} instead")
if where is True:
where = None
if where is not None:
where = astensor(where)
self.where = where
return x, out, where
def _set_sparse(self, inputs):
setattr(self, "sparse", self._is_sparse(inputs[0]))
def _calc_order(self, x, out):
if out is not None:
return out.order
if self.order in "KA":
return x.order
elif self.order == "C":
return TensorOrder.C_ORDER
else:
return TensorOrder.F_ORDER
@property
def ufunc_extra_params(self):
return dict()
def _call_tensor_ufunc(self, x, out=None, where=None):
if hasattr(x, "__tensor_ufunc__"):
ret = x.__tensor_ufunc__(
type(self), [x], out, where, **self.ufunc_extra_params
)
if ret is NotImplemented:
return
return ret
def _call(self, x, out=None, where=None):
# check tensor ufunc, if x is not a tensor, e.g. MaxFrame DataFrame
# which implements tensor ufunc, will delegate the computation
# to it if possible
ret = self._call_tensor_ufunc(x, out=out, where=where)
if ret is not None:
return ret
x, out, where = self._process_inputs(x, out, where)
shape = x.shape
order = self._calc_order(x, out)
inputs = filter_inputs([x, out, where])
t = self.new_tensor(inputs, shape, order=order)
if out is None:
return t
check_out_param(out, t, getattr(self, "casting"))
out_shape, out_dtype = out.shape, out.dtype
# if `out` is specified, use out's dtype and shape
if t.shape != out_shape:
t = self.new_tensor(inputs, out_shape, order=order)
setattr(self, "dtype", out_dtype)
out.data = t.data
return out
def __call__(self, x, out=None, where=None):
return self._call(x, out=out, where=where)
class TensorOutBinOp(TensorOperator, TensorElementWiseWithInputs):
_input = KeyField("input")
out1 = KeyField("out1", default=None)
out2 = KeyField("out2", default=None)
where = KeyField("where", default=None)
order = StringField("order", default=None)
casting = StringField("casting", default=None)
def __init__(self, **kwargs):
super().__init__(**kwargs)
if self.order is None:
self.order = "K"
check_order(self.order)
@property
def output_limit(self):
return 2
@property
def input(self):
return self._input
def _set_inputs(self, inputs):
super()._set_inputs(inputs)
inputs_iter = iter(self._inputs)
self._input = next(inputs_iter)
if getattr(self, "out1", None) is not None:
self.out1 = next(inputs_iter)
if getattr(self, "out2", None) is not None:
self.out2 = next(inputs_iter)
if getattr(self, "where", None) is not None:
self.where = next(inputs_iter)
def _process_inputs(self, x, out1, out2, where):
x = astensor(x)
if out1 is not None:
if isinstance(out1, Tensor):
self.out1 = out1
else:
raise TypeError(
f"out1 should be Tensor object, got {type(out1)} instead"
)
if out2 is not None:
if isinstance(out2, Tensor):
self.out2 = out2
else:
raise TypeError(
f"out2 should be Tensor object, got {type(out2)} instead"
)
if where is True:
where = None
if where is not None:
where = astensor(where)
self.where = where
return x, out1, out2, where
@classmethod
def _is_sparse(cls, x):
return False
def _set_sparse(self, inputs):
setattr(self, "sparse", self._is_sparse(inputs[0]))
@property
def _fun(self):
raise NotImplementedError
def _calc_order(self, x, out):
if out is not None:
return out.order
if self.order in "KA":
return x.order
elif self.order == "C":
return TensorOrder.C_ORDER
else:
return TensorOrder.F_ORDER
def _call(self, x, out1=None, out2=None, out=None, where=None):
dtype = [r.dtype for r in self._fun(np.empty(1, dtype=x.dtype))]
out = out or (None, None)
out1 = out1 or out[0]
out2 = out2 or out[1]
x, out1, out2, where = self._process_inputs(x, out1, out2, where)
shape = x.shape
order1 = self._calc_order(x, out1)
order2 = self._calc_order(x, out2)
inputs = filter_inputs([x, out1, out2, where])
t1, t2 = self.new_tensors(
inputs,
shape,
kws=[
{"order": order1, "dtype": dtype[0], "side": "left"},
{"order": order2, "dtype": dtype[1], "side": "right"},
],
)
if out1 is None and out2 is None:
return ExecutableTuple([t1, t2])
if out1 is not None:
check_out_param(out1, t1, self.casting)
out1_shape, out1_dtype = out1.shape, out1.dtype
else:
out1_shape, out1_dtype = t1.shape, t1.dtype
if out2 is not None:
check_out_param(out2, t2, self.casting)
out2_shape, out2_dtype = out2.shape, out2.dtype
else:
out2_shape, out2_dtype = t2.shape, t2.dtype
# if `out` is specified, use out's dtype and shape
if t1.shape != out1_shape or t2.shape != out2_shape:
t1, t2 = self.new_tensor(
inputs,
[out1_shape, out2_shape],
kws=[
{"order": order1, "dtype": out1_dtype},
{"order": order2, "dtype": out2_dtype},
],
)
if out1 is not None:
out1.data = t1.data
else:
out1 = t1
if out2 is not None:
out2.data = t2.data
else:
out2 = t2
return ExecutableTuple([out1, out2])
def __call__(self, x, out1=None, out2=None, out=None, where=None):
return self._call(x, out1=out1, out2=out2, out=out, where=where)
class TensorMultiOp(TensorElementWiseWithInputs, TensorOperator):
args = ListField("args", default=None)
out = KeyField("out", default=None)
where = KeyField("where", default=None)
casting = StringField("casting", default=None)
order = StringField("order", default=None)
err = DictField("err", FieldTypes.string, FieldTypes.string, default_factory=dict)
def __init__(self, **kwargs):
super().__init__(**kwargs)
if self.casting is None:
self.casting = "same_kind"
if self.order is None:
self.order = "K"
check_order(self.order)
@classmethod
def _is_sparse(cls, *args):
return False
def _set_sparse(self, inputs):
inputs_iter = iter(inputs or ())
args = list(self.args)
for idx in range(len(self.args)):
if not np.isscalar(self.args[idx]):
args[idx] = next(inputs_iter)
setattr(self, "sparse", self._is_sparse(*args))
def _set_inputs(self, inputs):
super()._set_inputs(inputs)
inputs_iter = iter(inputs or ())
args = list(self.args)
for idx in range(len(args)):
if not np.isscalar(args[idx]):
args[idx] = next(inputs_iter)
self.args = args
if getattr(self, "out", None) is not None:
self.out = next(inputs_iter)
if getattr(self, "where", None) is not None:
self.where = next(inputs_iter)
def _process_inputs(self, *args, out=None):
self.args = [a if np.isscalar(a) else astensor(a) for a in args]
if out is not None:
if isinstance(out, Tensor):
self.out = out
else:
raise TypeError(f"out should be Tensor object, got {type(out)} instead")
return args + (out,)
def __call__(self, *args, out=None):
proc_inputs_results = self._process_inputs(*args, out=out)
args = proc_inputs_results[:-1]
(out,) = proc_inputs_results[-1:]
# check broadcast
shapes = [() if np.isscalar(a) else a.shape for a in self.args]
shape = broadcast_shape(*shapes)
order = out.order if out is not None else None
inputs = filter_inputs(list(args) + [out])
t = self.new_tensor(inputs, shape, order=order)
if out is None:
return t
check_out_param(out, t, self.casting)
out_shape, out_dtype = out.shape, out.dtype
# if `out` is specified, use out's dtype and shape
if t.shape != out_shape:
t = self.new_tensor(inputs, out_shape, order=order)
setattr(self, "dtype", out_dtype)
out.data = t.data
return out