core/maxframe/dataframe/reduction/aggregation.py (293 lines of code) (raw):

# Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import copy import functools import itertools from collections import OrderedDict from collections.abc import Iterable from typing import List import numpy as np import pandas as pd from ... import opcodes from ... import tensor as maxframe_tensor from ...core import ENTITY_TYPE, OutputType, enter_mode from ...serialization.serializables import AnyField, BoolField, DictField, ListField from ...typing_ import TileableType from ...utils import lazy_import, pd_release_version from ..operators import DataFrameOperator, DataFrameOperatorMixin from ..utils import build_df, build_empty_df, build_series, parse_index, validate_axis from .core import ( CustomReduction, ReductionAggStep, ReductionCompiler, ReductionPostStep, ReductionPreStep, ) cp = lazy_import("cupy", rename="cp") cudf = lazy_import("cudf") _agg_size_as_series = pd_release_version >= (1, 3, 0) def where_function(cond, var1, var2): if hasattr(var1, "ndim") and var1.ndim >= 1: return var1.where(cond, var2) elif isinstance(var1, ENTITY_TYPE): return maxframe_tensor.where(cond, var1, var2) else: return np.where(cond, var1, var2).item() _agg_functions = { "sum": lambda x, skipna=True: x.sum(skipna=skipna), "prod": lambda x, skipna=True: x.prod(skipna=skipna), "product": lambda x, skipna=True: x.product(skipna=skipna), "min": lambda x, skipna=True: x.min(skipna=skipna), "max": lambda x, skipna=True: x.max(skipna=skipna), "all": lambda x, skipna=True: x.all(skipna=skipna), "any": lambda x, skipna=True: x.any(skipna=skipna), "count": lambda x: x.count(), "size": lambda x: x._reduction_size(), "mean": lambda x, skipna=True: x.mean(skipna=skipna), "var": lambda x, skipna=True, ddof=1: x.var(skipna=skipna, ddof=ddof), "std": lambda x, skipna=True, ddof=1: x.std(skipna=skipna, ddof=ddof), "sem": lambda x, skipna=True, ddof=1: x.sem(skipna=skipna, ddof=ddof), "skew": lambda x, skipna=True, bias=False: x.skew(skipna=skipna, bias=bias), "kurt": lambda x, skipna=True, bias=False: x.kurt(skipna=skipna, bias=bias), "kurtosis": lambda x, skipna=True, bias=False: x.kurtosis(skipna=skipna, bias=bias), "nunique": lambda x: x.nunique(), "median": lambda x, skipna=True: x.median(skipna=skipna), } class DataFrameAggregate(DataFrameOperator, DataFrameOperatorMixin): _op_type_ = opcodes.AGGREGATE raw_func = AnyField("raw_func") raw_func_kw = DictField("raw_func_kw") func = AnyField("func") func_rename = ListField("func_rename", default=None) axis = AnyField("axis", default=0) numeric_only = BoolField("numeric_only") bool_only = BoolField("bool_only") pre_funcs: List[ReductionPreStep] = ListField("pre_funcs") agg_funcs: List[ReductionAggStep] = ListField("agg_funcs") post_funcs: List[ReductionPostStep] = ListField("post_funcs") @staticmethod def _filter_dtypes(op: "DataFrameAggregate", dtypes): if not op.numeric_only and not op.bool_only: return dtypes empty_df = build_empty_df(dtypes) return empty_df.select_dtypes( [np.number, np.bool_] if op.numeric_only else [np.bool_] ).dtypes def _calc_result_shape(self, df): if df.ndim == 2: if self.numeric_only: df = df.select_dtypes([np.number, np.bool_]) elif self.bool_only: df = df.select_dtypes([np.bool_]) if self.output_types[0] == OutputType.dataframe: test_obj = build_df(df, size=[2, 2], fill_value=[1, 2], ensure_string=True) else: test_obj = build_series( df, size=[2, 2], fill_value=[1, 2], name=df.name, ensure_string=True ) result_df = test_obj.agg(self.raw_func, axis=self.axis, **self.raw_func_kw) if isinstance(result_df, pd.DataFrame): self.output_types = [OutputType.dataframe] return result_df.dtypes, result_df.index elif isinstance(result_df, pd.Series): self.output_types = [OutputType.series] return pd.Series([result_df.dtype], index=[result_df.name]), result_df.index else: self.output_types = [OutputType.scalar] return np.array(result_df).dtype, None def __call__(self, df, output_type=None, dtypes=None, index=None): self._output_types = df.op.output_types normalize_reduction_funcs(self, ndim=df.ndim) compile_reduction_funcs(self, df) if output_type is None or dtypes is None: with enter_mode(kernel=False, build=False): dtypes, index = self._calc_result_shape(df) else: self.output_types = [output_type] if self.output_types[0] == OutputType.dataframe: if self.axis == 0: new_shape = (len(index), len(dtypes)) new_index = parse_index(index, store_data=True) else: new_shape = (df.shape[0], len(dtypes)) new_index = df.index_value return self.new_dataframe( [df], shape=new_shape, dtypes=dtypes, index_value=new_index, columns_value=parse_index(dtypes.index, store_data=True), ) elif self.output_types[0] == OutputType.series: if df.ndim == 1: new_shape = (len(index),) new_index = parse_index(index, store_data=True) elif self.axis == 0: new_shape = (len(index),) new_index = parse_index(index, store_data=True) else: new_shape = (df.shape[0],) new_index = df.index_value return self.new_series( [df], shape=new_shape, dtype=dtypes[0], name=dtypes.index[0], index_value=new_index, ) elif self.output_types[0] == OutputType.tensor: return self.new_tileable([df], dtype=dtypes, shape=(np.nan,)) else: return self.new_scalar([df], dtype=dtypes) def is_funcs_aggregate(func, func_kw=None, ndim=2): func_kw = func_kw or dict() if ndim == 1 and func is None: func, func_kw = func_kw, dict() to_check = [] if func is not None: if isinstance(func, (list, tuple)): to_check.extend(func) elif isinstance(func, dict): if ndim == 2: for f in func.values(): if isinstance(f, Iterable) and not isinstance(f, str): to_check.extend(f) else: to_check.append(f) else: if any(isinstance(v, tuple) for v in func.values()): raise TypeError("nested renamer is not supported") to_check.extend(func.values()) else: to_check.append(func) else: for v in func_kw.values(): if ( not isinstance(v, tuple) or len(v) != 2 or (not isinstance(v[1], str) and not callable(v[1])) ): raise TypeError("Must provide 'func' or tuples of (column, aggfunc).") else: to_check.append(v[1]) compiler = ReductionCompiler() for f in to_check: if f in _agg_functions: continue elif callable(f): try: if ndim == 2: compiler.add_function(f, 2, cols=["A", "B"]) else: compiler.add_function(f, 1) except ValueError: return False else: return False return True def normalize_reduction_funcs(op, ndim=None): raw_func = op.raw_func if ndim == 1 and raw_func is None: raw_func = op.raw_func_kw if raw_func is not None: if isinstance(raw_func, dict): if ndim == 2: new_func = OrderedDict() for k, v in raw_func.items(): if isinstance(v, str) or callable(v): new_func[k] = [v] else: new_func[k] = v op.func = new_func else: op.func = list(raw_func.values()) op.func_rename = list(raw_func.keys()) elif isinstance(raw_func, Iterable) and not isinstance(raw_func, str): op.func = list(raw_func) else: op.func = [raw_func] else: new_func = OrderedDict() new_func_names = OrderedDict() for k, v in op.raw_func_kw.items(): try: col_funcs = new_func[v[0]] col_func_names = new_func_names[v[0]] except KeyError: col_funcs = new_func[v[0]] = [] col_func_names = new_func_names[v[0]] = [] col_funcs.append(v[1]) col_func_names.append(k) op.func = new_func op.func_rename = functools.reduce( lambda a, b: a + b, new_func_names.values(), [] ) custom_idx = 0 if isinstance(op.func, list): custom_iter = (f for f in op.func if isinstance(f, CustomReduction)) else: custom_iter = (f for f in op.func.values() if isinstance(f, CustomReduction)) for r in custom_iter: if r.name == "<custom>": r.name = f"<custom_{custom_idx}>" custom_idx += 1 def _add_compiler_functions( op: "DataFrameAggregate", compiler: ReductionCompiler, cols=None ): if isinstance(op.func, list): func_iter = ((None, f) for f in op.func) cols_set = set(cols) if cols is not None else None else: assert cols is not None cols_set = set(cols) & set(op.func.keys()) if len(cols_set) == 0: return False func_iter = ((col, f) for col, funcs in op.func.items() for f in funcs) func_renames = ( op.func_rename if getattr(op, "func_rename", None) is not None else itertools.repeat(None) ) for func_rename, (col, f) in zip(func_renames, func_iter): if cols_set is not None and col is not None and col not in cols_set: continue func_name = None if isinstance(f, str): f, func_name = _agg_functions[f], f if func_rename is not None: func_name = func_rename ndim = 1 if cols is None else 2 func_cols = [col] if col is not None else None compiler.add_function(f, ndim, cols=func_cols, func_name=func_name) return True def compile_reduction_funcs(op: DataFrameAggregate, input: TileableType): compiler = ReductionCompiler(axis=getattr(op, "axis", 0)) cols = input.dtypes.index if input.ndim > 1 else None if _add_compiler_functions(op, compiler, cols=cols): compiled = compiler.compile() op.pre_funcs = compiled.pre_funcs op.agg_funcs = compiled.agg_funcs op.post_funcs = compiled.post_funcs def aggregate(df, func=None, axis=0, **kw): """ Aggregate using one or more operations over the specified axis. Parameters ---------- df : DataFrame, Series Object to aggregate. func : list or dict Function to use for aggregating the data. axis : {0 or ‘index’, 1 or ‘columns’}, default 0 If 0 or ‘index’: apply function to each column. If 1 or ‘columns’: apply function to each row. kw Keyword arguments to pass to func. Returns ------- scalar, Series or DataFrame The return can be: * scalar : when Series.agg is called with single function * Series : when DataFrame.agg is called with a single function * DataFrame : when DataFrame.agg is called with several functions Examples -------- >>> import maxframe.dataframe as md >>> df = md.DataFrame([[1, 2, 3], ... [4, 5, 6], ... [7, 8, 9], ... [np.nan, np.nan, np.nan]], ... columns=['A', 'B', 'C']).execute() Aggregate these functions over the rows. >>> df.agg(['sum', 'min']).execute() A B C min 1.0 2.0 3.0 sum 12.0 15.0 18.0 Different aggregations per column. >>> df.agg({'A' : ['sum', 'min'], 'B' : ['min', 'max']}).execute() A B max NaN 8.0 min 1.0 2.0 sum 12.0 NaN Aggregate different functions over the columns and rename the index of the resulting DataFrame. >>> df.agg(x=('A', 'max'), y=('B', 'min'), z=('C', 'mean')).execute() A B C x 7.0 NaN NaN y NaN 2.0 NaN z NaN NaN 6.0 >>> s = md.Series([1, 2, 3, 4]) >>> s.agg('min').execute() 1 >>> s.agg(['min', 'max']).execute() max 4 min 1 """ axis = validate_axis(axis, df) if ( df.ndim == 2 and isinstance(func, dict) and (df.op.output_types[0] == OutputType.series or axis == 1) ): raise NotImplementedError( "Currently cannot aggregate dicts over axis=1 on %s" % type(df).__name__ ) numeric_only = kw.pop("_numeric_only", None) bool_only = kw.pop("_bool_only", None) output_type = kw.pop("_output_type", None) dtypes = kw.pop("_dtypes", None) index = kw.pop("_index", None) if not is_funcs_aggregate(func, func_kw=kw, ndim=df.ndim): return df.transform(func, axis=axis, _call_agg=True) op = DataFrameAggregate( raw_func=copy.deepcopy(func), raw_func_kw=copy.deepcopy(kw), axis=axis, numeric_only=numeric_only, bool_only=bool_only, ) return op(df, output_type=output_type, dtypes=dtypes, index=index)