core/maxframe/dataframe/operators.py (216 lines of code) (raw):

# Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import numpy as np import pandas as pd from ..core import ENTITY_TYPE, OutputType from ..core.operator import Operator, ShuffleProxy, TileableOperatorMixin from ..tensor.core import TENSOR_TYPE from ..tensor.datasource import tensor as astensor from .core import DATAFRAME_TYPE, SERIES_TYPE class DataFrameOperatorMixin(TileableOperatorMixin): __slots__ = () _op_module_ = "dataframe" def new_dataframes( self, inputs, shape=None, dtypes=None, index_value=None, columns_value=None, chunks=None, nsplits=None, output_limit=None, kws=None, **kw ): setattr(self, "_output_types", [OutputType.dataframe]) return self.new_tileables( inputs, shape=shape, dtypes=dtypes, index_value=index_value, columns_value=columns_value, chunks=chunks, nsplits=nsplits, output_limit=output_limit, kws=kws, **kw ) def new_dataframe( self, inputs, shape=None, dtypes=None, index_value=None, columns_value=None, **kw ): if getattr(self, "output_limit") != 1: raise TypeError("cannot new DataFrame with more than 1 outputs") return self.new_dataframes( inputs, shape=shape, dtypes=dtypes, index_value=index_value, columns_value=columns_value, **kw )[0] def new_seriess( self, inputs, shape=None, dtype=None, index_value=None, name=None, chunks=None, nsplits=None, output_limit=None, kws=None, **kw ): setattr(self, "_output_types", [OutputType.series]) return self.new_tileables( inputs, shape=shape, dtype=dtype, index_value=index_value, name=name, chunks=chunks, nsplits=nsplits, output_limit=output_limit, kws=kws, **kw ) def new_series( self, inputs, shape=None, dtype=None, index_value=None, name=None, **kw ): if getattr(self, "output_limit") != 1: raise TypeError("cannot new Series with more than 1 outputs") return self.new_seriess( inputs, shape=shape, dtype=dtype, index_value=index_value, name=name, **kw )[0] def new_df_or_series(self, inputs, **kw): setattr(self, "_output_types", [OutputType.df_or_series]) return self.new_tileables(inputs, **kw)[0] def new_indexes( self, inputs, shape=None, dtype=None, index_value=None, name=None, chunks=None, nsplits=None, output_limit=None, kws=None, **kw ): setattr(self, "_output_types", [OutputType.index]) return self.new_tileables( inputs, shape=shape, dtype=dtype, index_value=index_value, name=name, chunks=chunks, nsplits=nsplits, output_limit=output_limit, kws=kws, **kw ) def new_index( self, inputs, shape=None, dtype=None, index_value=None, name=None, **kw ): if getattr(self, "output_limit") != 1: raise TypeError("cannot new Index with more than 1 outputs") return self.new_indexes( inputs, shape=shape, dtype=dtype, index_value=index_value, name=name, **kw )[0] def new_scalars( self, inputs, dtype=None, chunks=None, output_limit=None, kws=None, **kw ): setattr(self, "_output_types", [OutputType.scalar]) return self.new_tileables( inputs, shape=(), dtype=dtype, chunks=chunks, nsplits=(), output_limit=output_limit, kws=kws, **kw ) def new_scalar(self, inputs, dtype=None, **kw): if getattr(self, "output_limit") != 1: raise TypeError("cannot new tensor with more than 1 outputs") return self.new_scalars(inputs, dtype=dtype, **kw)[0] def new_categoricals( self, inputs, shape=None, dtype=None, categories_value=None, chunks=None, nsplits=None, output_limit=None, kws=None, **kw ): setattr(self, "_output_types", [OutputType.categorical]) return self.new_tileables( inputs, shape=shape, dtype=dtype, categories_value=categories_value, chunks=chunks, nsplits=nsplits, output_limit=output_limit, kws=kws, **kw ) def new_categorical( self, inputs, shape=None, dtype=None, categories_value=None, **kw ): if getattr(self, "output_limit") != 1: raise TypeError("cannot new Categorical with more than 1 outputs") return self.new_categoricals( inputs, shape=shape, dtype=dtype, categories_value=categories_value, **kw )[0] @classmethod def _process_groupby_params(cls, groupby_params): new_groupby_params = groupby_params.copy() if isinstance(groupby_params["by"], list): by = [] for v in groupby_params["by"]: if isinstance(v, ENTITY_TYPE): by.append(cls.concat_tileable_chunks(v).chunks[0]) else: by.append(v) new_groupby_params["by"] = by return new_groupby_params @classmethod def _get_groupby_inputs(cls, groupby, groupby_params): inputs = [groupby] chunk_inputs = list(groupby.chunks) if isinstance(groupby_params["by"], list): for chunk_v, v in zip( groupby_params["by"], groupby.op.groupby_params["by"] ): if isinstance(v, ENTITY_TYPE): inputs.append(v) chunk_inputs.append(chunk_v) return inputs, chunk_inputs @staticmethod def _process_input(x): from .initializer import DataFrame, Series if isinstance(x, (DATAFRAME_TYPE, SERIES_TYPE)) or pd.api.types.is_scalar(x): return x elif isinstance(x, pd.Series): return Series(x) elif isinstance(x, pd.DataFrame): return DataFrame(x) elif isinstance(x, (list, tuple, np.ndarray, TENSOR_TYPE)): return astensor(x) raise NotImplementedError DataFrameOperator = Operator class DataFrameShuffleProxy(ShuffleProxy, DataFrameOperatorMixin): def __init__(self, sparse=None, output_types=None, **kwargs): super().__init__(sparse=sparse, _output_types=output_types, **kwargs)