core/maxframe/dataframe/groupby/aggregation.py (251 lines of code) (raw):
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import logging
from typing import Callable, Dict
import numpy as np
import pandas as pd
from ... import opcodes
from ...core import ENTITY_TYPE, OutputType
from ...serialization.serializables import (
AnyField,
DictField,
Int32Field,
Int64Field,
ListField,
StringField,
)
from ...utils import lazy_import, pd_release_version
from ..core import GROUPBY_TYPE
from ..operators import DataFrameOperator, DataFrameOperatorMixin
from ..reduction.aggregation import (
compile_reduction_funcs,
is_funcs_aggregate,
normalize_reduction_funcs,
)
from ..utils import is_cudf, parse_index
cp = lazy_import("cupy", rename="cp")
cudf = lazy_import("cudf")
logger = logging.getLogger(__name__)
CV_THRESHOLD = 0.2
MEAN_RATIO_THRESHOLD = 2 / 3
_support_get_group_without_as_index = pd_release_version[:2] > (1, 0)
class SizeRecorder:
def __init__(self):
self._raw_records = []
self._agg_records = []
def record(self, raw_record: int, agg_record: int):
self._raw_records.append(raw_record)
self._agg_records.append(agg_record)
def get(self):
return self._raw_records, self._agg_records
_agg_functions = {
"sum": lambda x: x.sum(),
"prod": lambda x: x.prod(),
"product": lambda x: x.product(),
"min": lambda x: x.min(),
"max": lambda x: x.max(),
"all": lambda x: x.all(),
"any": lambda x: x.any(),
"count": lambda x: x.count(),
"size": lambda x: x._reduction_size(),
"mean": lambda x: x.mean(),
"var": lambda x, ddof=1: x.var(ddof=ddof),
"std": lambda x, ddof=1: x.std(ddof=ddof),
"sem": lambda x, ddof=1: x.sem(ddof=ddof),
"skew": lambda x, bias=False: x.skew(bias=bias),
"kurt": lambda x, bias=False: x.kurt(bias=bias),
"kurtosis": lambda x, bias=False: x.kurtosis(bias=bias),
"nunique": lambda x: x.nunique(),
"median": lambda x: x.median(),
}
_series_col_name = "col_name"
def _patch_groupby_kurt():
try:
from pandas.core.groupby import DataFrameGroupBy, SeriesGroupBy
if not hasattr(DataFrameGroupBy, "kurt"): # pragma: no branch
def _kurt_by_frame(a, *args, **kwargs):
data = a.to_frame().kurt(*args, **kwargs).iloc[0]
if is_cudf(data): # pragma: no cover
data = data.copy()
return data
def _group_kurt(x, *args, **kwargs):
if kwargs.get("numeric_only") is not None:
return x.agg(functools.partial(_kurt_by_frame, *args, **kwargs))
else:
return x.agg(functools.partial(pd.Series.kurt, *args, **kwargs))
DataFrameGroupBy.kurt = DataFrameGroupBy.kurtosis = _group_kurt
SeriesGroupBy.kurt = SeriesGroupBy.kurtosis = _group_kurt
except (AttributeError, ImportError): # pragma: no cover
pass
_patch_groupby_kurt()
del _patch_groupby_kurt
def build_mock_agg_result(
groupby: GROUPBY_TYPE,
groupby_params: Dict,
raw_func: Callable,
**raw_func_kw,
):
try:
agg_result = groupby.op.build_mock_groupby().aggregate(raw_func, **raw_func_kw)
except ValueError:
if (
groupby_params.get("as_index") or _support_get_group_without_as_index
): # pragma: no cover
raise
agg_result = (
groupby.op.build_mock_groupby(as_index=True)
.aggregate(raw_func, **raw_func_kw)
.to_frame()
)
agg_result.index.names = [None] * agg_result.index.nlevels
return agg_result
class DataFrameGroupByAgg(DataFrameOperator, DataFrameOperatorMixin):
_op_type_ = opcodes.GROUPBY_AGG
raw_func = AnyField("raw_func")
raw_func_kw = DictField("raw_func_kw")
func = AnyField("func")
func_rename = ListField("func_rename", default=None)
raw_groupby_params = DictField("raw_groupby_params")
groupby_params = DictField("groupby_params")
method = StringField("method")
# for chunk
chunk_store_limit = Int64Field("chunk_store_limit")
pre_funcs = ListField("pre_funcs")
agg_funcs = ListField("agg_funcs")
post_funcs = ListField("post_funcs")
index_levels = Int32Field("index_levels")
size_recorder_name = StringField("size_recorder_name")
def _set_inputs(self, inputs):
super()._set_inputs(inputs)
inputs_iter = iter(self._inputs[1:])
if len(self._inputs) > 1:
by = []
for v in self.groupby_params["by"]:
if isinstance(v, ENTITY_TYPE):
by.append(next(inputs_iter))
else:
by.append(v)
self.groupby_params["by"] = by
def _get_inputs(self, inputs):
if isinstance(self.groupby_params["by"], list):
for v in self.groupby_params["by"]:
if isinstance(v, ENTITY_TYPE):
inputs.append(v)
return inputs
def _get_index_levels(self, groupby, mock_index):
if not self.groupby_params["as_index"]:
try:
as_index_agg_df = groupby.op.build_mock_groupby(
as_index=True
).aggregate(self.raw_func, **self.raw_func_kw)
except: # noqa: E722 # nosec # pylint: disable=bare-except
# handling cases like mdf.groupby("b", as_index=False).b.agg({"c": "count"})
if isinstance(self.groupby_params["by"], list):
return len(self.groupby_params["by"])
raise # pragma: no cover
pd_index = as_index_agg_df.index
else:
pd_index = mock_index
return 1 if not isinstance(pd_index, pd.MultiIndex) else len(pd_index.levels)
def _fix_as_index(self, result_index: pd.Index):
# make sure if as_index=False takes effect
if isinstance(result_index, pd.MultiIndex):
# if MultiIndex, as_index=False definitely takes no effect
self.groupby_params["as_index"] = True
elif result_index.name is not None:
# if not MultiIndex and agg_df.index has a name
# means as_index=False takes no effect
self.groupby_params["as_index"] = True
def _call_dataframe(self, groupby, input_df):
compile_reduction_funcs(self, input_df)
agg_df = build_mock_agg_result(
groupby, self.groupby_params, self.raw_func, **self.raw_func_kw
)
shape = (np.nan, agg_df.shape[1])
if isinstance(agg_df.index, pd.RangeIndex):
index_value = parse_index(
pd.RangeIndex(-1), groupby.key, groupby.index_value.key
)
else:
index_value = parse_index(
agg_df.index, groupby.key, groupby.index_value.key
)
# make sure if as_index=False takes effect
self._fix_as_index(agg_df.index)
# determine num of indices to group in intermediate steps
self.index_levels = self._get_index_levels(groupby, agg_df.index)
inputs = self._get_inputs([input_df])
return self.new_dataframe(
inputs,
shape=shape,
dtypes=agg_df.dtypes,
index_value=index_value,
columns_value=parse_index(agg_df.columns, store_data=True),
)
def _call_series(self, groupby, in_series):
compile_reduction_funcs(self, in_series)
agg_result = build_mock_agg_result(
groupby, self.groupby_params, self.raw_func, **self.raw_func_kw
)
# make sure if as_index=False takes effect
self._fix_as_index(agg_result.index)
index_value = parse_index(
agg_result.index, groupby.key, groupby.index_value.key
)
inputs = self._get_inputs([in_series])
# determine num of indices to group in intermediate steps
self.index_levels = self._get_index_levels(groupby, agg_result.index)
# update value type
if isinstance(agg_result, pd.DataFrame):
return self.new_dataframe(
inputs,
shape=(np.nan, len(agg_result.columns)),
dtypes=agg_result.dtypes,
index_value=index_value,
columns_value=parse_index(agg_result.columns, store_data=True),
)
else:
return self.new_series(
inputs,
shape=(np.nan,),
dtype=agg_result.dtype,
name=agg_result.name,
index_value=index_value,
)
def __call__(self, groupby):
normalize_reduction_funcs(self, ndim=groupby.ndim)
df = groupby
while df.op.output_types[0] not in (OutputType.dataframe, OutputType.series):
df = df.inputs[0]
if self.raw_func == "size":
self.output_types = [OutputType.series]
else:
self.output_types = (
[OutputType.dataframe]
if groupby.op.output_types[0] == OutputType.dataframe_groupby
else [OutputType.series]
)
if self.output_types[0] == OutputType.dataframe:
return self._call_dataframe(groupby, df)
else:
return self._call_series(groupby, df)
def agg(groupby, func=None, method="auto", *args, **kwargs):
"""
Aggregate using one or more operations on grouped data.
Parameters
----------
groupby : MaxFrame Groupby
Groupby data.
func : str or list-like
Aggregation functions.
method : {'auto', 'shuffle', 'tree'}, default 'auto'
'tree' method provide a better performance, 'shuffle' is recommended
if aggregated result is very large, 'auto' will use 'shuffle' method
in distributed mode and use 'tree' in local mode.
Returns
-------
Series or DataFrame
Aggregated result.
Examples
--------
>>> import maxframe.dataframe as md
>>> df = md.DataFrame(
... {
... "A": [1, 1, 2, 2],
... "B": [1, 2, 3, 4],
... "C": [0.362838, 0.227877, 1.267767, -0.562860],
... }
... ).execute()
A B C
0 1 1 0.362838
1 1 2 0.227877
2 2 3 1.267767
3 2 4 -0.562860
The aggregation is for each column.
>>> df.groupby('A').agg('min').execute()
B C
A
1 1 0.227877
2 3 -0.562860
Multiple aggregations.
>>> df.groupby('A').agg(['min', 'max']).execute()
B C
min max min max
A
1 1 2 0.227877 0.362838
2 3 4 -0.562860 1.267767
Different aggregations per column
>>> df.groupby('A').agg({'B': ['min', 'max'], 'C': 'sum'}).execute()
B C
min max sum
A
1 1 2 0.590715
2 3 4 0.704907
To control the output names with different aggregations per column, pandas supports “named aggregation”
>>> from maxframe.dataframe.groupby import NamedAgg
>>> df.groupby("A").agg(
... b_min=NamedAgg(column="B", aggfunc="min"),
... c_sum=NamedAgg(column="C", aggfunc="sum")).execute()
b_min c_sum
A
1 1 0.590715
2 3 0.704907
"""
# When perform a computation on the grouped data, we won't shuffle
# the data in the stage of groupby and do shuffle after aggregation.
if not isinstance(groupby, GROUPBY_TYPE):
raise TypeError(f"Input should be type of groupby, not {type(groupby)}")
if method is None:
method = "auto"
if method not in ["shuffle", "tree", "auto"]:
raise ValueError(
f"Method {method} is not available, please specify 'tree' or 'shuffle"
)
if not is_funcs_aggregate(func, ndim=groupby.ndim):
# pass index to transform, otherwise it will lose name info for index
agg_result = build_mock_agg_result(
groupby, groupby.op.groupby_params, func, **kwargs
)
if isinstance(agg_result.index, pd.RangeIndex):
# set -1 to represent unknown size for RangeIndex
index_value = parse_index(
pd.RangeIndex(-1), groupby.key, groupby.index_value.key
)
else:
index_value = parse_index(
agg_result.index, groupby.key, groupby.index_value.key
)
return groupby.transform(
func, *args, _call_agg=True, index=index_value, **kwargs
)
agg_op = DataFrameGroupByAgg(
raw_func=func,
raw_func_kw=kwargs,
method=method,
raw_groupby_params=groupby.op.groupby_params,
groupby_params=groupby.op.groupby_params,
)
return agg_op(groupby)