core/maxframe/dataframe/groupby/fill.py (74 lines of code) (raw):

# Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import pandas as pd from ... import opcodes from ...core import OutputType from ...serialization.serializables import AnyField, DictField, Int64Field, StringField from ..operators import DataFrameOperator, DataFrameOperatorMixin from ..utils import parse_index class GroupByFillOperator(DataFrameOperator, DataFrameOperatorMixin): _op_module_ = "dataframe.groupby" value = AnyField("value", default=None) method = StringField("method", default=None) axis = AnyField("axis", default=0) limit = Int64Field("limit", default=None) downcast = DictField("downcast", default=None) def _calc_out_dtypes(self, in_groupby): mock_groupby = in_groupby.op.build_mock_groupby() func_name = getattr(self, "_func_name") if func_name == "fillna": kw = {} if self.axis is not None: kw["axis"] = self.axis result_df = mock_groupby.fillna( value=self.value, method=self.method, limit=self.limit, downcast=self.downcast, **kw, ) else: result_df = getattr(mock_groupby, func_name)(limit=self.limit) if isinstance(result_df, pd.DataFrame): self.output_types = [OutputType.dataframe] return result_df.dtypes else: self.output_types = [OutputType.series] return result_df.name, result_df.dtype def __call__(self, groupby): in_df = groupby while in_df.op.output_types[0] not in (OutputType.dataframe, OutputType.series): in_df = in_df.inputs[0] out_dtypes = self._calc_out_dtypes(groupby) kw = in_df.params.copy() kw["index_value"] = parse_index(pd.RangeIndex(-1), groupby.key) if self.output_types[0] == OutputType.dataframe: kw.update( dict( columns_value=parse_index(out_dtypes.index, store_data=True), dtypes=out_dtypes, shape=(groupby.shape[0], len(out_dtypes)), ) ) else: name, dtype = out_dtypes kw.update(dtype=dtype, name=name, shape=(groupby.shape[0],)) return self.new_tileable([groupby], **kw) class GroupByFFill(GroupByFillOperator): _op_type_ = opcodes.FILL_NA _func_name = "ffill" class GroupByBFill(GroupByFillOperator): _op_type = opcodes.FILL_NA _func_name = "bfill" class GroupByFillNa(GroupByFillOperator): _op_type = opcodes.FILL_NA _func_name = "fillna" def ffill(groupby, limit=None): """ Forward fill the values. limit: int, default None Limit number of values to fill return: Series or DataFrame """ op = GroupByFFill(limit=limit) return op(groupby) def bfill(groupby, limit=None): """ Backward fill the values. limit: int, default None Limit number of values to fill return: Series or DataFrame """ op = GroupByBFill(limit=limit) return op(groupby) def fillna(groupby, value=None, method=None, axis=None, limit=None, downcast=None): """ Fill NA/NaN values using the specified method value: scalar, dict, Series, or DataFrame Value to use to fill holes (e.g. 0), alternately a dict/Series/DataFrame of values specifying which value to use for each index (for a Series) or column (for a DataFrame). Values not in the dict/Series/DataFrame will not be filled. This value cannot be a list. method: {'backfill','bfill','ffill',None}, default None axis: {0 or 'index', 1 or 'column'} limit: int, default None If method is specified, this is the maximum number of consecutive NaN values to forward/backward fill downcast: dict, default None A dict of item->dtype of what to downcast if possible, or the string ‘infer’ which will try to downcast to an appropriate equal type return: DataFrame or None """ op = GroupByFillNa( value=value, method=method, axis=axis, limit=limit, downcast=downcast ) return op(groupby)