core/maxframe/dataframe/arithmetic/core.py (300 lines of code) (raw):
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import numpy as np
import pandas as pd
from ...core import ENTITY_TYPE
from ...serialization.serializables import AnyField
from ...tensor.core import TENSOR_TYPE
from ...utils import classproperty, get_dtype
from ..core import DATAFRAME_TYPE, SERIES_TYPE
from ..operators import DataFrameOperator, DataFrameOperatorMixin
from ..ufunc.tensor import TensorUfuncMixin
from ..utils import (
build_empty_df,
infer_dtype,
infer_dtypes,
infer_index_value,
parse_index,
)
class DataFrameBinOpMixin(DataFrameOperatorMixin):
@classproperty
def _operator(self):
raise NotImplementedError
@classmethod
def _calc_properties(cls, x1, x2=None, axis="columns", level=None):
if isinstance(x1, DATAFRAME_TYPE) and (
x2 is None or pd.api.types.is_scalar(x2) or isinstance(x2, TENSOR_TYPE)
):
if pd.api.types.is_scalar(x2):
dtypes = cls._operator(build_empty_df(x1.dtypes), x2).dtypes
elif x1.dtypes is not None and isinstance(x2, TENSOR_TYPE):
dtypes = pd.Series(
[infer_dtype(dt, x2.dtype, cls._operator) for dt in x1.dtypes],
index=x1.dtypes.index,
)
else: # pragma: no cover
dtypes = x1.dtypes
return {
"shape": x1.shape,
"dtypes": dtypes,
"columns_value": x1.columns_value,
"index_value": x1.index_value,
}
if isinstance(x1, SERIES_TYPE) and (
x2 is None or pd.api.types.is_scalar(x2) or isinstance(x2, TENSOR_TYPE)
):
x2_dtype = x2.dtype if hasattr(x2, "dtype") else type(x2)
x2_dtype = get_dtype(x2_dtype)
if hasattr(cls, "return_dtype"):
dtype = cls.return_dtype
else:
dtype = infer_dtype(x1.dtype, x2_dtype, cls._operator)
ret = {"shape": x1.shape, "dtype": dtype}
if pd.api.types.is_scalar(x2) or (
hasattr(x2, "ndim") and (x2.ndim == 0 or x2.ndim == 1)
):
ret["name"] = x1.name
ret["index_value"] = x1.index_value
return ret
if isinstance(x1, DATAFRAME_TYPE) and isinstance(x2, DATAFRAME_TYPE):
index_shape, column_shape, dtypes, columns, index = (
np.nan,
np.nan,
None,
None,
None,
)
if (
x1.columns_value is not None
and x2.columns_value is not None
and x1.columns_value.key == x2.columns_value.key
):
dtypes = pd.Series(
[
infer_dtype(dt1, dt2, cls._operator)
for dt1, dt2 in zip(x1.dtypes, x2.dtypes)
],
index=x1.dtypes.index,
)
columns = copy.copy(x1.columns_value)
column_shape = len(dtypes)
elif x1.dtypes is not None and x2.dtypes is not None:
dtypes = infer_dtypes(x1.dtypes, x2.dtypes, cls._operator)
columns = parse_index(dtypes.index, store_data=True)
column_shape = len(dtypes)
if x1.index_value is not None and x2.index_value is not None:
if x1.index_value.key == x2.index_value.key:
index = copy.copy(x1.index_value)
index_shape = x1.shape[0]
else:
index = infer_index_value(
x1.index_value, x2.index_value, level=level
)
if index.key == x1.index_value.key == x2.index_value.key and (
not np.isnan(x1.shape[0]) or not np.isnan(x2.shape[0])
):
index_shape = (
x1.shape[0] if not np.isnan(x1.shape[0]) else x2.shape[0]
)
return {
"shape": (index_shape, column_shape),
"dtypes": dtypes,
"columns_value": columns,
"index_value": index,
}
if isinstance(x1, DATAFRAME_TYPE) and isinstance(x2, SERIES_TYPE):
if axis == "columns" or axis == 1:
index_shape = x1.shape[0]
index = x1.index_value
column_shape, dtypes, columns = np.nan, None, None
if x1.columns_value is not None and x1.index_value is not None:
if x1.columns_value.key == x2.index_value.key:
dtypes = pd.Series(
[
infer_dtype(dt, x2.dtype, cls._operator)
for dt in x1.dtypes
],
index=x1.dtypes.index,
)
columns = copy.copy(x1.columns_value)
column_shape = len(dtypes)
else: # pragma: no cover
dtypes = x1.dtypes # FIXME
columns = infer_index_value(
x1.columns_value, x2.index_value, level=level
)
column_shape = np.nan
else:
assert axis == "index" or axis == 0
column_shape = x1.shape[1]
columns = x1.columns_value
dtypes = x1.dtypes
index_shape, index = np.nan, None
if x1.index_value is not None and x1.index_value is not None:
if x1.index_value.key == x2.index_value.key:
dtypes = pd.Series(
[
infer_dtype(dt, x2.dtype, cls._operator)
for dt in x1.dtypes
],
index=x1.dtypes.index,
)
index = copy.copy(x1.index_value)
index_shape = x1.shape[0]
else:
if x1.dtypes is not None:
dtypes = pd.Series(
[
infer_dtype(dt, x2.dtype, cls._operator)
for dt in x1.dtypes
],
index=x1.dtypes.index,
)
index = infer_index_value(
x1.index_value, x2.index_value, level=level
)
index_shape = np.nan
return {
"shape": (index_shape, column_shape),
"dtypes": dtypes,
"columns_value": columns,
"index_value": index,
}
if isinstance(x1, SERIES_TYPE) and isinstance(x2, SERIES_TYPE):
index_shape, dtype, index = np.nan, None, None
dtype = infer_dtype(x1.dtype, x2.dtype, cls._operator)
if x1.index_value is not None and x2.index_value is not None:
if x1.index_value.key == x2.index_value.key:
index = copy.copy(x1.index_value)
index_shape = x1.shape[0]
else:
index = infer_index_value(
x1.index_value, x2.index_value, level=level
)
if index.key == x1.index_value.key == x2.index_value.key and (
not np.isnan(x1.shape[0]) or not np.isnan(x2.shape[0])
):
index_shape = (
x1.shape[0] if not np.isnan(x1.shape[0]) else x2.shape[0]
)
ret = {"shape": (index_shape,), "dtype": dtype, "index_value": index}
if x1.name == x2.name:
ret["name"] = x1.name
return ret
raise NotImplementedError("Unknown combination of parameters")
def _check_inputs(self, x1, x2):
if isinstance(x1, TENSOR_TYPE) or isinstance(x2, TENSOR_TYPE):
tensor, other = (x1, x2) if isinstance(x1, TENSOR_TYPE) else (x2, x1)
if isinstance(other, DATAFRAME_TYPE):
if self.axis == "index" or self.axis == 0:
other_shape = tuple(reversed(other.shape))
else:
other_shape = other.shape
if tensor.ndim == 2 and tensor.shape != other_shape:
raise ValueError(
f"Unable to coerce to DataFrame, shape must be {other_shape}: "
f"given {tensor.shape}"
)
elif tensor.ndim == 1 and tensor.shape[0] != other_shape[1]:
raise ValueError(
f"Unable to coerce to Series, length must be {other_shape[1]}: "
f"given {tensor.shape[0]}"
)
elif tensor.ndim > 2:
raise ValueError(
"Unable to coerce to Series/DataFrame, dim must be <= 2"
)
if isinstance(other, SERIES_TYPE):
if tensor.ndim == 1 and (tensor.shape[0] != other.shape[0]):
raise ValueError(
f"Unable to coerce to Series, length must be {other.shape[0]}: "
f"given {tensor.shape[0]}"
)
elif tensor.ndim > 1:
raise ValueError("Unable to coerce to Series, dim must be 1")
def _call(self, x1, x2):
self._check_inputs(x1, x2)
if isinstance(x1, DATAFRAME_TYPE) or isinstance(x2, DATAFRAME_TYPE):
df1, df2 = (x1, x2) if isinstance(x1, DATAFRAME_TYPE) else (x2, x1)
kw = self._calc_properties(df1, df2, axis=self.axis, level=self.level)
if not pd.api.types.is_scalar(df2):
return self.new_dataframe([x1, x2], **kw)
else:
return self.new_dataframe([df1], **kw)
if isinstance(x1, SERIES_TYPE) or isinstance(x2, SERIES_TYPE):
s1, s2 = (x1, x2) if isinstance(x1, SERIES_TYPE) else (x2, x1)
kw = self._calc_properties(s1, s2, level=self.level)
if not pd.api.types.is_scalar(s2):
return self.new_series([x1, x2], **kw)
else:
return self.new_series([s1], **kw)
raise NotImplementedError(
"Only support add dataframe, series or scalar for now"
)
def __call__(self, x1, x2):
x1 = self._process_input(x1)
x2 = self._process_input(x2)
if isinstance(x1, SERIES_TYPE) and isinstance(x2, DATAFRAME_TYPE):
# reject invoking series's op on dataframe
raise NotImplementedError
return self._call(x1, x2)
def rcall(self, x1, x2):
x1 = self._process_input(x1)
x2 = self._process_input(x2)
if isinstance(x1, SERIES_TYPE) and isinstance(x2, DATAFRAME_TYPE):
# reject invoking series's op on dataframe
raise NotImplementedError
return self._call(x2, x1)
class DataFrameBinOp(DataFrameOperator, DataFrameBinOpMixin):
axis = AnyField("axis", default=None)
level = AnyField("level", default=None)
fill_value = AnyField("fill_value", default=None)
lhs = AnyField("lhs")
rhs = AnyField("rhs")
def __init__(self, output_types=None, **kw):
super().__init__(_output_types=output_types, **kw)
def _set_inputs(self, inputs):
super()._set_inputs(inputs)
if len(self._inputs) == 2:
self.lhs = self._inputs[0]
self.rhs = self._inputs[1]
else:
if isinstance(self.lhs, ENTITY_TYPE):
self.lhs = self._inputs[0]
elif pd.api.types.is_scalar(self.lhs):
self.rhs = self._inputs[0]
class DataFrameUnaryOpMixin(DataFrameOperatorMixin):
__slots__ = ()
class DataFrameUnaryOp(DataFrameOperator, DataFrameUnaryOpMixin):
def __init__(self, output_types=None, **kw):
super().__init__(_output_types=output_types, **kw)
@classmethod
def _get_output_dtype(cls, df):
if df.ndim == 2:
return df.dtypes
else:
return df.dtype
def __call__(self, df):
self.output_types = df.op.output_types
if df.ndim == 2:
return self.new_dataframe(
[df],
shape=df.shape,
dtypes=self._get_output_dtype(df),
columns_value=df.columns_value,
index_value=df.index_value,
)
else:
series = df
return self.new_series(
[series],
shape=series.shape,
name=series.name,
index_value=series.index_value,
dtype=self._get_output_dtype(series),
)
class DataFrameArithmeticTreeMixin:
def _set_inputs(self, inputs):
inputs = self._get_inputs_data(inputs)
setattr(self, "_inputs", inputs)
class DataFrameUnaryUfunc(DataFrameUnaryOp, TensorUfuncMixin):
pass
class DataFrameBinopUfunc(DataFrameBinOp, TensorUfuncMixin):
pass