core/maxframe/dataframe/indexing/getitem.py (159 lines of code) (raw):

# Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from numbers import Integral import numpy as np import pandas as pd from ... import opcodes from ...core import ENTITY_TYPE, OutputType from ...serialization.serializables import AnyField, BoolField from ...tensor.core import TENSOR_TYPE from ...tensor.datasource import tensor as astensor from ..core import DATAFRAME_TYPE, SERIES_TYPE from ..operators import DataFrameOperator, DataFrameOperatorMixin from ..utils import parse_index class SeriesIndex(DataFrameOperator, DataFrameOperatorMixin): _op_module_ = "series" _op_type_ = opcodes.INDEX labels = AnyField("labels", default=None) is_intermediate = BoolField("is_intermediate", default=None) def __init__(self, output_types=None, **kw): super().__init__(_output_types=output_types, **kw) def __call__(self, series, name=None): return self.new_tileable([series], dtype=series.dtype, name=name) def _new_tileables(self, inputs, kws=None, **kw): # Override this method to automatically decide the output type, # when `labels` is a list, we will set `output_types` as series, # otherwise it will be a scalar. output_types = getattr(self, "_output_types", None) shape = kw.pop("shape", None) is_scalar = not isinstance(self.labels, list) if not output_types: output_types = [OutputType.scalar] if is_scalar else [OutputType.series] self.output_types = output_types if shape is None: shape = () if is_scalar else ((len(self.labels)),) kw["shape"] = shape if not is_scalar: index_value = kw.pop("index_value", None) or parse_index( pd.Index(self.labels) ) kw["index_value"] = index_value return super()._new_tileables(inputs, kws=kws, **kw) class DataFrameIndex(DataFrameOperator, DataFrameOperatorMixin): _op_type_ = opcodes.INDEX col_names = AnyField("col_names", default=None) # for bool index mask = AnyField("mask", default=None) identical_index = BoolField("identical_index") def __init__(self, output_types=None, **kw): output_types = output_types or [OutputType.series] super().__init__(_output_types=output_types, **kw) def _set_inputs(self, inputs): super()._set_inputs(inputs) if isinstance(self.col_names, ENTITY_TYPE): self.col_names = self._inputs[0] if isinstance(self.mask, ENTITY_TYPE): self.mask = self._inputs[-1] def __call__(self, df): if self.col_names is not None: # if col_names is a list, return a DataFrame, else return a Series col_names = self.col_names if not isinstance(col_names, list): col_names = [col_names] is_list = False else: is_list = True dtypes_list = df._get_dtypes_by_columns(col_names) if is_list or len(dtypes_list) > 1: if len(col_names) != len(dtypes_list): col_names = df._get_columns_by_columns(col_names) columns = parse_index(pd.Index(col_names), store_data=True) return self.new_dataframe( [df], shape=(df.shape[0], len(col_names)), dtypes=pd.Series(dtypes_list, index=col_names, dtype=np.dtype("O")), index_value=df.index_value, columns_value=columns, ) else: dtype = dtypes_list[0] return self.new_series( [df], shape=(df.shape[0],), dtype=dtype, index_value=df.index_value, name=self.col_names, ) else: if isinstance(self.mask, (SERIES_TYPE, DATAFRAME_TYPE, TENSOR_TYPE)): index_value = parse_index( pd.Index( [], dtype=df.index_value.to_pandas().dtype, name=df.index_value.name, ), df, self.mask, ) return self.new_dataframe( [df, self.mask], shape=(np.nan, df.shape[1]), dtypes=df.dtypes, index_value=index_value, columns_value=df.columns_value, ) else: index_value = parse_index( pd.Index( [], dtype=df.index_value.to_pandas().dtype, name=df.index_value.name, ), df, self.mask, ) return self.new_dataframe( [df], shape=(np.nan, df.shape[1]), dtypes=df.dtypes, index_value=index_value, columns_value=df.columns_value, ) _list_like_types = (list, np.ndarray, SERIES_TYPE, pd.Series, TENSOR_TYPE) def dataframe_getitem(df, item): columns_set = set(df.dtypes.keys()) if isinstance(item, (np.ndarray, pd.Series)) and item.dtype != np.bool_: item = item.tolist() if isinstance(item, slice): edge = item.start if item.start is not None else item.stop if isinstance(edge, Integral): return df.iloc[item] else: return df.loc[item] elif isinstance(item, list): for col_name in item: if col_name not in columns_set: raise KeyError(f"{col_name} not in columns") op = DataFrameIndex(col_names=item, output_types=[OutputType.dataframe]) elif isinstance(item, _list_like_types) or hasattr(item, "dtypes"): # NB: don't enforce the dtype of `item` to be `bool` since it may be unknown if isinstance(item, DATAFRAME_TYPE + SERIES_TYPE): identical_index = df.index_value.key == item.index_value.key else: identical_index = False op = DataFrameIndex( mask=item, identical_index=identical_index, output_types=[OutputType.dataframe], ) else: if item not in columns_set: raise KeyError(f"{item} not in columns {columns_set}") op = DataFrameIndex(col_names=item) return op(df) def series_getitem(series, labels): if isinstance(labels, list) or isinstance(labels, tuple) or np.isscalar(labels): op = SeriesIndex(labels=labels) return op(series, name=series.name) elif isinstance(labels, _list_like_types) and astensor(labels).dtype == np.bool_: return series.loc[labels] elif isinstance(labels, slice): edge = labels.start if labels.start is not None else labels.stop if isinstance(edge, Integral): return series.iloc[labels] else: return series.loc[labels] else: raise NotImplementedError(f"type {type(labels)} is not support for getitem")