core/maxframe/dataframe/misc/apply.py (346 lines of code) (raw):

# Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import inspect from typing import Any, Union import numpy as np import pandas as pd from pandas import DataFrame, Series from ... import opcodes from ...core import OutputType from ...core.operator import OperatorLogicKeyGeneratorMixin from ...serialization.serializables import ( AnyField, BoolField, DictField, FunctionField, StringField, TupleField, ) from ...utils import get_func_token, quiet_stdio, tokenize from ..operators import DataFrameOperator, DataFrameOperatorMixin from ..utils import ( build_df, build_series, copy_func_scheduling_hints, make_dtype, make_dtypes, pack_func_args, parse_index, validate_axis, validate_output_types, ) class ApplyOperandLogicKeyGeneratorMixin(OperatorLogicKeyGeneratorMixin): def _get_logic_key_token_values(self): token_values = super()._get_logic_key_token_values() + [ self.axis, self.convert_dtype, self.raw, self.result_type, self.elementwise, ] if self.func: return token_values + [get_func_token(self.func)] else: # pragma: no cover return token_values class ApplyOperator( DataFrameOperator, DataFrameOperatorMixin, ApplyOperandLogicKeyGeneratorMixin ): _op_type_ = opcodes.APPLY func = FunctionField("func") axis = AnyField("axis", default=0) convert_dtype = BoolField("convert_dtype", default=True) raw = BoolField("raw", default=False) result_type = StringField("result_type", default=None) elementwise = BoolField("elementwise") logic_key = StringField("logic_key") need_clean_up_func = BoolField("need_clean_up_func") args = TupleField("args", default=()) kwds = DictField("kwds", default={}) def __init__(self, output_type=None, **kw): if output_type: kw["_output_types"] = [output_type] super().__init__(**kw) if hasattr(self, "func"): copy_func_scheduling_hints(self.func, self) def _update_key(self): values = [v for v in self._values_ if v is not self.func] + [ get_func_token(self.func) ] self._obj_set("_key", tokenize(type(self).__name__, *values)) return self def _infer_df_func_returns(self, df, dtypes, dtype=None, name=None, index=None): if isinstance(self.func, np.ufunc): output_type = OutputType.dataframe new_dtypes = None index_value = "inherit" new_elementwise = True else: if self.output_types is not None and ( dtypes is not None or dtype is not None ): ret_dtypes = dtypes if dtypes is not None else (name, dtype) ret_index_value = parse_index(index) if index is not None else None self.elementwise = False return ret_dtypes, ret_index_value output_type = new_dtypes = index_value = None new_elementwise = False try: empty_df = build_df(df, size=2) with np.errstate(all="ignore"), quiet_stdio(): infer_df = empty_df.apply( self.func, axis=self.axis, raw=self.raw, result_type=self.result_type, args=self.args, **self.kwds, ) if index_value is None: if infer_df.index is empty_df.index: index_value = "inherit" else: index_value = parse_index(pd.RangeIndex(-1)) if isinstance(infer_df, pd.DataFrame): output_type = output_type or OutputType.dataframe new_dtypes = new_dtypes or infer_df.dtypes else: output_type = output_type or OutputType.series new_dtypes = (name or infer_df.name, dtype or infer_df.dtype) new_elementwise = False if new_elementwise is None else new_elementwise except: # noqa: E722 # nosec pass self.output_types = ( [output_type] if not self.output_types and output_type else self.output_types ) dtypes = new_dtypes if dtypes is None else dtypes index_value = index_value if index is None else parse_index(index) self.elementwise = ( new_elementwise if self.elementwise is None else self.elementwise ) return dtypes, index_value def _call_df_or_series(self, df): return self.new_df_or_series([df]) def _call_dataframe(self, df, dtypes=None, dtype=None, name=None, index=None): # for backward compatibility dtype = dtype if dtype is not None else dtypes dtypes, index_value = self._infer_df_func_returns( df, dtypes, dtype=dtype, name=name, index=index ) if index_value is None: index_value = parse_index(None, (df.key, df.index_value.key)) for arg, desc in zip((self.output_types, dtypes), ("output_types", "dtypes")): if arg is None: raise TypeError( f"Cannot determine {desc} by calculating with enumerate data, " "please specify it as arguments" ) if index_value == "inherit": index_value = df.index_value if self.elementwise: shape = df.shape elif self.output_types[0] == OutputType.dataframe: shape = [np.nan, np.nan] shape[1 - self.axis] = df.shape[1 - self.axis] if self.axis == 1: shape[1] = len(dtypes) shape = tuple(shape) else: shape = (df.shape[1 - self.axis],) if self.output_types[0] == OutputType.dataframe: if self.axis == 0: return self.new_dataframe( [df], shape=shape, dtypes=dtypes, index_value=index_value, columns_value=parse_index(dtypes.index, store_data=True), ) else: return self.new_dataframe( [df], shape=shape, dtypes=dtypes, index_value=df.index_value, columns_value=parse_index(dtypes.index, store_data=True), ) else: name, dtype = dtypes return self.new_series( [df], shape=shape, name=name, dtype=dtype, index_value=index_value ) def _call_series(self, series, dtypes=None, dtype=None, name=None, index=None): # for backward compatibility dtype = dtype if dtype is not None else dtypes if self.convert_dtype: if self.output_types is not None and ( dtypes is not None or dtype is not None ): infer_series = test_series = None else: test_series = build_series(series, size=2, name=series.name) try: with np.errstate(all="ignore"), quiet_stdio(): infer_series = test_series.apply( self.func, args=self.args, **self.kwds ) except: # noqa: E722 # nosec # pylint: disable=bare-except infer_series = None output_type = self._output_types[0] if index is not None: index_value = parse_index(index) elif infer_series is not None: if infer_series.index is test_series.index: index_value = series.index_value else: # pragma: no cover index_value = parse_index(infer_series.index) else: index_value = parse_index(series.index_value) if output_type == OutputType.dataframe: if dtypes is None: if infer_series is not None and infer_series.ndim == 2: dtypes = infer_series.dtypes else: raise TypeError( "Cannot determine dtypes, " "please specify `dtypes` as argument" ) columns_value = parse_index(dtypes.index, store_data=True) return self.new_dataframe( [series], shape=(series.shape[0], len(dtypes)), index_value=index_value, columns_value=columns_value, dtypes=dtypes, ) else: if ( dtype is None and infer_series is not None and infer_series.ndim == 1 ): dtype = infer_series.dtype else: dtype = dtype if dtype is not None else np.dtype(object) if infer_series is not None and infer_series.ndim == 1: name = name or infer_series.name return self.new_series( [series], dtype=dtype, shape=series.shape, index_value=index_value, name=name, ) else: dtype = dtype if dtype is not None else np.dtype("object") return self.new_series( [series], dtype=dtype, shape=series.shape, index_value=series.index_value, name=name, ) def __call__(self, df_or_series, dtypes=None, dtype=None, name=None, index=None): axis = getattr(self, "axis", None) or 0 dtypes = make_dtypes(dtypes) dtype = make_dtype(dtype) self.axis = validate_axis(axis, df_or_series) if self.output_types and self.output_types[0] == OutputType.df_or_series: return self._call_df_or_series(df_or_series) if df_or_series.op.output_types[0] == OutputType.dataframe: return self._call_dataframe( df_or_series, dtypes=dtypes, dtype=dtype, name=name, index=index ) else: return self._call_series( df_or_series, dtypes=dtypes, dtype=dtype, name=name, index=index ) def _build_stub_pandas_obj(self, df_or_series) -> Union[DataFrame, Series]: if self.output_types[0] == OutputType.dataframe: return build_df(df_or_series, size=2) return build_series(df_or_series, size=2, name=df_or_series.name) def get_packed_funcs(self, df=None) -> Any: stub_df = self._build_stub_pandas_obj(df or self.inputs[0]) return pack_func_args(stub_df, self.func, *self.args, **self.kwds) def df_apply( df, func, axis=0, raw=False, result_type=None, args=(), dtypes=None, dtype=None, name=None, output_type=None, index=None, elementwise=None, skip_infer=False, **kwds, ): # FIXME: https://github.com/aliyun/alibabacloud-odps-maxframe-client/issues/50 """ Apply a function along an axis of the DataFrame. Objects passed to the function are Series objects whose index is either the DataFrame's index (``axis=0``) or the DataFrame's columns (``axis=1``). By default (``result_type=None``), the final return type is inferred from the return type of the applied function. Otherwise, it depends on the `result_type` argument. Parameters ---------- func : function Function to apply to each column or row. axis : {0 or 'index', 1 or 'columns'}, default 0 Axis along which the function is applied: * 0 or 'index': apply function to each column. * 1 or 'columns': apply function to each row. raw : bool, default False Determines if row or column is passed as a Series or ndarray object: * ``False`` : passes each row or column as a Series to the function. * ``True`` : the passed function will receive ndarray objects instead. If you are just applying a NumPy reduction function this will achieve much better performance. result_type : {'expand', 'reduce', 'broadcast', None}, default None These only act when ``axis=1`` (columns): * 'expand' : list-like results will be turned into columns. * 'reduce' : returns a Series if possible rather than expanding list-like results. This is the opposite of 'expand'. * 'broadcast' : results will be broadcast to the original shape of the DataFrame, the original index and columns will be retained. The default behaviour (None) depends on the return value of the applied function: list-like results will be returned as a Series of those. However if the apply function returns a Series these are expanded to columns. output_type : {'dataframe', 'series'}, default None Specify type of returned object. See `Notes` for more details. dtypes : Series, default None Specify dtypes of returned DataFrames. See `Notes` for more details. dtype : numpy.dtype, default None Specify dtype of returned Series. See `Notes` for more details. name : str, default None Specify name of returned Series. See `Notes` for more details. index : Index, default None Specify index of returned object. See `Notes` for more details. elementwise : bool, default False Specify whether ``func`` is an elementwise function: * ``False`` : The function is not elementwise. MaxFrame will try concatenating chunks in rows (when ``axis=0``) or in columns (when ``axis=1``) and then apply ``func`` onto the concatenated chunk. The concatenation step can cause extra latency. * ``True`` : The function is elementwise. MaxFrame will apply ``func`` to original chunks. This will not introduce extra concatenation step and reduce overhead. skip_infer: bool, default False Whether infer dtypes when dtypes or output_type is not specified. args : tuple Positional arguments to pass to `func` in addition to the array/series. **kwds Additional keyword arguments to pass as keywords arguments to `func`. Returns ------- Series or DataFrame Result of applying ``func`` along the given axis of the DataFrame. See Also -------- DataFrame.applymap: For elementwise operations. DataFrame.aggregate: Only perform aggregating type operations. DataFrame.transform: Only perform transforming type operations. Notes ----- When deciding output dtypes and shape of the return value, MaxFrame will try applying ``func`` onto a mock DataFrame, and the apply call may fail. When this happens, you need to specify the type of apply call (DataFrame or Series) in output_type. * For DataFrame output, you need to specify a list or a pandas Series as ``dtypes`` of output DataFrame. ``index`` of output can also be specified. * For Series output, you need to specify ``dtype`` and ``name`` of output Series. * For any input with data type ``pandas.ArrowDtype(pyarrow.MapType)``, it will always be converted to a Python dict. And for any output with this data type, it must be returned as a Python dict as well. Examples -------- >>> import numpy as np >>> import maxframe.tensor as mt >>> import maxframe.dataframe as md >>> df = md.DataFrame([[4, 9]] * 3, columns=['A', 'B']) >>> df.execute() A B 0 4 9 1 4 9 2 4 9 Using a reducing function on either axis >>> df.apply(np.sum, axis=0).execute() A 12 B 27 dtype: int64 >>> df.apply(lambda row: int(np.sum(row)), axis=1).execute() 0 13 1 13 2 13 dtype: int64 Passing ``result_type='expand'`` will expand list-like results to columns of a Dataframe >>> df.apply(lambda x: [1, 2], axis=1, result_type='expand').execute() 0 1 0 1 2 1 1 2 2 1 2 Returning a Series inside the function is similar to passing ``result_type='expand'``. The resulting column names will be the Series index. >>> df.apply(lambda x: pd.Series([1, 2], index=['foo', 'bar']), axis=1).execute() foo bar 0 1 2 1 1 2 2 1 2 Passing ``result_type='broadcast'`` will ensure the same shape result, whether list-like or scalar is returned by the function, and broadcast it along the axis. The resulting column names will be the originals. >>> df.apply(lambda x: [1, 2], axis=1, result_type='broadcast').execute() A B 0 1 2 1 1 2 2 1 2 Create a dataframe with a map type. >>> import pyarrow as pa >>> import pandas as pd >>> from maxframe.lib.dtypes_extension import dict_ >>> col_a = pd.Series( ... data=[[("k1", 1), ("k2", 2)], [("k1", 3)], None], ... index=[1, 2, 3], ... dtype=dict_(pa.string(), pa.int64()), ... ) >>> col_b = pd.Series( ... data=["A", "B", "C"], ... index=[1, 2, 3], ... ) >>> df = md.DataFrame({"A": col_a, "B": col_b}) >>> df.execute() A B 1 [('k1', 1), ('k2', 2)] A 2 [('k1', 3)] B 3 <NA> C Define a function that updates the map type with a new key-value pair. >>> def custom_set_item(x): ... if x["A"] is not None: ... x["A"]["k2"] = 10 ... return x >>> df.apply( ... custom_set_item, ... axis=1, ... output_type="dataframe", ... dtypes=df.dtypes.copy(), ... ).execute() A B 1 [('k1', 1), ('k2', 10)] A 2 [('k1', 3), ('k2', 10)] B 3 <NA> C """ if isinstance(func, (list, dict)): return df.aggregate(func, axis) output_types = kwds.pop("output_types", None) object_type = kwds.pop("object_type", None) output_types = validate_output_types( output_type=output_type, output_types=output_types, object_type=object_type ) output_type = output_types[0] if output_types else None if skip_infer and output_type is None: output_type = OutputType.df_or_series # calling member function if isinstance(func, str): func = getattr(df, func) sig = inspect.getfullargspec(func) if "axis" in sig.args: kwds["axis"] = axis return func(*args, **kwds) op = ApplyOperator( func=func, axis=axis, raw=raw, result_type=result_type, args=args, kwds=kwds, output_type=output_type, elementwise=elementwise, ) return op(df, dtypes=dtypes, dtype=dtype, name=name, index=index) def series_apply( series, func, convert_dtype=True, output_type=None, args=(), dtypes=None, dtype=None, name=None, index=None, skip_infer=False, **kwds, ): """ Invoke function on values of Series. Can be ufunc (a NumPy function that applies to the entire Series) or a Python function that only works on single values. Parameters ---------- func : function Python function or NumPy ufunc to apply. convert_dtype : bool, default True Try to find better dtype for elementwise function results. If False, leave as dtype=object. output_type : {'dataframe', 'series'}, default None Specify type of returned object. See `Notes` for more details. dtypes : Series, default None Specify dtypes of returned DataFrames. See `Notes` for more details. dtype : numpy.dtype, default None Specify dtype of returned Series. See `Notes` for more details. name : str, default None Specify name of returned Series. See `Notes` for more details. index : Index, default None Specify index of returned object. See `Notes` for more details. args : tuple Positional arguments passed to func after the series value. skip_infer: bool, default False Whether infer dtypes when dtypes or output_type is not specified. **kwds Additional keyword arguments passed to func. Returns ------- Series or DataFrame If func returns a Series object the result will be a DataFrame. See Also -------- Series.map: For element-wise operations. Series.agg: Only perform aggregating type operations. Series.transform: Only perform transforming type operations. Notes ----- When deciding output dtypes and shape of the return value, MaxFrame will try applying ``func`` onto a mock Series, and the apply call may fail. When this happens, you need to specify the type of apply call (DataFrame or Series) in output_type. * For DataFrame output, you need to specify a list or a pandas Series as ``dtypes`` of output DataFrame. ``index`` of output can also be specified. * For Series output, you need to specify ``dtype`` and ``name`` of output Series. * For any input with data type ``pandas.ArrowDtype(pyarrow.MapType)``, it will always be converted to a Python dict. And for any output with this data type, it must be returned as a Python dict as well. Examples -------- Create a series with typical summer temperatures for each city. >>> import maxframe.tensor as mt >>> import maxframe.dataframe as md >>> s = md.Series([20, 21, 12], ... index=['London', 'New York', 'Helsinki']) >>> s.execute() London 20 New York 21 Helsinki 12 dtype: int64 Square the values by defining a function and passing it as an argument to ``apply()``. >>> def square(x): ... return x ** 2 >>> s.apply(square).execute() London 400 New York 441 Helsinki 144 dtype: int64 Square the values by passing an anonymous function as an argument to ``apply()``. >>> s.apply(lambda x: x ** 2).execute() London 400 New York 441 Helsinki 144 dtype: int64 Define a custom function that needs additional positional arguments and pass these additional arguments using the ``args`` keyword. >>> def subtract_custom_value(x, custom_value): ... return x - custom_value >>> s.apply(subtract_custom_value, args=(5,)).execute() London 15 New York 16 Helsinki 7 dtype: int64 Define a custom function that takes keyword arguments and pass these arguments to ``apply``. >>> def add_custom_values(x, **kwargs): ... for month in kwargs: ... x += kwargs[month] ... return x >>> s.apply(add_custom_values, june=30, july=20, august=25).execute() London 95 New York 96 Helsinki 87 dtype: int64 Create a series with a map type. >>> import pyarrow as pa >>> from maxframe.lib.dtypes_extension import dict_ >>> s = md.Series( ... data=[[("k1", 1), ("k2", 2)], [("k1", 3)], None], ... index=[1, 2, 3], ... dtype=dict_(pa.string(), pa.int64()), ... ) >>> s.execute() 1 [('k1', 1), ('k2', 2)] 2 [('k1', 3)] 3 <NA> dtype: map<string, int64>[pyarrow] Define a function that updates the map type with a new key-value pair. >>> def custom_set_item(x): ... if x is not None: ... x["k2"] = 10 ... return x >>> s.apply(custom_set_item, output_type="series", dtype=dict_(pa.string(), pa.int64())).execute() 1 [('k1', 1), ('k2', 10)] 2 [('k1', 3), ('k2', 10)] 3 <NA> dtype: map<string, int64>[pyarrow] """ if isinstance(func, (list, dict)): return series.aggregate(func) # calling member function if isinstance(func, str): func_body = getattr(series, func, None) if func_body is not None: return func_body(*args, **kwds) func_str = func func = getattr(np, func_str, None) if func is None: raise AttributeError( f"'{func_str!r}' is not a valid function " f"for '{type(series).__name__}' object" ) if skip_infer and output_type is None: output_type = OutputType.df_or_series output_types = kwds.pop("output_types", None) object_type = kwds.pop("object_type", None) output_types = validate_output_types( output_type=output_type, output_types=output_types, object_type=object_type ) output_type = output_types[0] if output_types else OutputType.series op = ApplyOperator( func=func, convert_dtype=convert_dtype, args=args, kwds=kwds, output_type=output_type, ) return op(series, dtypes=dtypes, dtype=dtype, name=name, index=index)