core/maxframe/dataframe/groupby/cum.py (76 lines of code) (raw):
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pandas as pd
from ... import opcodes
from ...core import OutputType
from ...serialization.serializables import AnyField, BoolField
from ...utils import lazy_import
from ..operators import DataFrameOperator, DataFrameOperatorMixin
from ..utils import parse_index, validate_axis
cudf = lazy_import("cudf")
class GroupByCumReductionOperator(DataFrameOperatorMixin, DataFrameOperator):
_op_module_ = "dataframe.groupby"
axis = AnyField("axis", default=None)
ascending = BoolField("ascending", default=None)
def __init__(self, output_types=None, **kw):
super().__init__(_output_types=output_types, **kw)
def _calc_out_dtypes(self, in_groupby):
mock_groupby = in_groupby.op.build_mock_groupby()
func_name = getattr(self, "_func_name")
if func_name == "cumcount":
result_df = mock_groupby.cumcount(ascending=self.ascending)
else:
result_df = getattr(mock_groupby, func_name)(axis=self.axis)
if isinstance(result_df, pd.DataFrame):
self.output_types = [OutputType.dataframe]
return result_df.dtypes
else:
self.output_types = [OutputType.series]
return result_df.name, result_df.dtype
def __call__(self, groupby):
in_df = groupby
while in_df.op.output_types[0] not in (OutputType.dataframe, OutputType.series):
in_df = in_df.inputs[0]
self.axis = validate_axis(self.axis or 0, in_df)
out_dtypes = self._calc_out_dtypes(groupby)
kw = in_df.params.copy()
if self.output_types[0] == OutputType.dataframe:
kw.update(
dict(
columns_value=parse_index(out_dtypes.index, store_data=True),
dtypes=out_dtypes,
shape=(groupby.shape[0], len(out_dtypes)),
)
)
else:
name, dtype = out_dtypes
kw.update(dtype=dtype, name=name, shape=(groupby.shape[0],))
return self.new_tileable([groupby], **kw)
class GroupByCummin(GroupByCumReductionOperator):
_op_type_ = opcodes.CUMMIN
_func_name = "cummin"
class GroupByCummax(GroupByCumReductionOperator):
_op_type_ = opcodes.CUMMAX
_func_name = "cummax"
class GroupByCumsum(GroupByCumReductionOperator):
_op_type_ = opcodes.CUMSUM
_func_name = "cumsum"
class GroupByCumprod(GroupByCumReductionOperator):
_op_type_ = opcodes.CUMPROD
_func_name = "cumprod"
class GroupByCumcount(GroupByCumReductionOperator):
_op_type_ = opcodes.CUMCOUNT
_func_name = "cumcount"
def cumcount(groupby, ascending: bool = True):
op = GroupByCumcount(ascending=ascending)
return op(groupby)
def cummin(groupby, axis=0):
op = GroupByCummin(axis=axis)
return op(groupby)
def cummax(groupby, axis=0):
op = GroupByCummax(axis=axis)
return op(groupby)
def cumprod(groupby, axis=0):
op = GroupByCumprod(axis=axis)
return op(groupby)
def cumsum(groupby, axis=0):
op = GroupByCumsum(axis=axis)
return op(groupby)