core/maxframe/dataframe/indexing/loc.py (340 lines of code) (raw):

# Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from numbers import Integral from typing import Dict import numpy as np import pandas as pd from pandas.core.dtypes.cast import find_common_type from pandas.core.indexing import IndexingError from ... import opcodes from ...core import ENTITY_TYPE, OutputType from ...serialization.serializables import AnyField, KeyField, ListField from ...tensor.datasource import asarray from ...tensor.utils import calc_sliced_size, filter_inputs from ...utils import is_full_slice, lazy_import, pd_release_version from ..core import DATAFRAME_TYPE, IndexValue from ..operators import DataFrameOperator, DataFrameOperatorMixin from ..utils import parse_index from .iloc import DataFrameIlocSetItem cudf = lazy_import("cudf") with_slice_locs_kind = pd_release_version < (1, 4, 0) def process_loc_indexes(inp, indexes, fetch_index: bool = True): ndim = inp.ndim if not isinstance(indexes, tuple): indexes = (indexes,) if len(indexes) < ndim: indexes += (slice(None),) * (ndim - len(indexes)) if len(indexes) > ndim: raise IndexingError("Too many indexers") new_indexes = [] for ax, index in enumerate(indexes): if isinstance(index, (list, np.ndarray, pd.Series, ENTITY_TYPE)): if not isinstance(index, ENTITY_TYPE): index = np.asarray(index) elif fetch_index: index = asarray(index) if ax == 1: # do not support tensor index on axis 1 # because if so, the dtypes and columns_value would be unknown try: index = index.fetch() except (RuntimeError, ValueError): raise NotImplementedError( "indexer on axis columns cannot be non-executed tensor" ) new_indexes.append(index) return new_indexes class DataFrameLoc: def __init__(self, obj): self._obj = obj def _use_iloc(self, indexes): # for RangeIndex from 0, use iloc instead of loc index_value = self._obj.index_value.value if len(indexes) == 2: if not isinstance(indexes[1], slice): return False, None elif indexes[1] != slice(None): return False, None if not isinstance(index_value, IndexValue.RangeIndex): return False, None if index_value.slice.start != 0 and index_value.slice.start is not None: return False, None if not isinstance(indexes[0], (Integral, slice)): return False, None if isinstance(indexes[0], Integral): if indexes[0] < 0: return False, None else: index0 = indexes[0] for v in (index0.start, index0.stop, index0.step): if v is None: continue if not isinstance(v, Integral): return False, None if v < 0: return False, None if index0.stop is not None: # adjust slice right bound return ( True, [slice(index0.start, index0.stop + 1, index0.step)] + indexes[1:], ) return True, None def __getitem__(self, indexes): indexes = process_loc_indexes(self._obj, indexes) use_iloc, new_indexes = self._use_iloc(indexes) if use_iloc: # use iloc instead return self._obj.iloc[tuple(new_indexes or indexes)] op = DataFrameLocGetItem(indexes=indexes) return op(self._obj) def __setitem__(self, indexes, value): if not np.isscalar(value): raise NotImplementedError("Only scalar value is supported to set by loc") if not isinstance(self._obj, DATAFRAME_TYPE): raise NotImplementedError("Only DataFrame is supported to set by loc") indexes = process_loc_indexes(self._obj, indexes, fetch_index=False) use_iloc, new_indexes = self._use_iloc(indexes) if use_iloc: op = DataFrameIlocSetItem(indexes=new_indexes, value=value) ret = op(self._obj) self._obj.data = ret.data else: other_indices = [] indices_tileable = [ idx for idx in indexes if isinstance(idx, ENTITY_TYPE) or other_indices.append(idx) ] op = DataFrameLocSetItem(indexes=other_indices, value=value) ret = op([self._obj] + indices_tileable) self._obj.data = ret.data class DataFrameLocSetItem(DataFrameOperator, DataFrameOperatorMixin): _op_type_ = opcodes.DATAFRAME_ILOC_SETITEM indexes = ListField("indexes", default=None) value = AnyField("value", default=None) def __init__(self, gpu=None, sparse=False, output_types=None, **kw): super().__init__( gpu=gpu, sparse=sparse, _output_types=output_types, **kw, ) if not self.output_types: self.output_types = [OutputType.dataframe] def __call__(self, inputs): df = inputs[0] return self.new_dataframe( inputs, shape=df.shape, dtypes=df.dtypes, index_value=df.index_value, columns_value=df.columns_value, ) class DataFrameLocGetItem(DataFrameOperator, DataFrameOperatorMixin): _op_type_ = opcodes.DATAFRAME_LOC_GETITEM _input = KeyField("input") indexes = ListField("indexes", default=None) def __init__(self, gpu=None, sparse=False, output_types=None, **kw): super().__init__(gpu=gpu, sparse=sparse, _output_types=output_types, **kw) @property def input(self): return self._input @property def can_index_miss(self): return False def _set_inputs(self, inputs): super()._set_inputs(inputs) inputs_iter = iter(self._inputs) self._input = next(inputs_iter) indexes = [] for index in self.indexes: if isinstance(index, ENTITY_TYPE): indexes.append(next(inputs_iter)) else: indexes.append(index) self.indexes = list(indexes) @classmethod def _calc_slice_param( cls, input_index_value: IndexValue, pd_index: pd.Index, inp, index: slice, axis: int, ) -> Dict: param = dict() if is_full_slice(index): # full slice on this axis param["shape"] = inp.shape[axis] param["index_value"] = input_index_value if axis == 1: param["dtypes"] = inp.dtypes elif input_index_value.has_value(): kw = {} if with_slice_locs_kind: kw["kind"] = "loc" start, end = pd_index.slice_locs(index.start, index.stop, index.step, **kw) slc = slice(start, end, index.step) size = calc_sliced_size(inp.shape[axis], slc) param["shape"] = size out_index = pd_index[slc] param["index_value"] = parse_index(out_index, store_data=axis == 1) if axis == 1: param["dtypes"] = inp.dtypes[slc] else: assert axis == 0 if index.start is None and index.stop is None: param["shape"] = calc_sliced_size(inp.shape[axis], index) else: param["shape"] = np.nan param["index_value"] = parse_index(pd_index, inp, index) return param @classmethod def _calc_bool_index_param( cls, input_index_value: IndexValue, pd_index: pd.Index, inp, index, axis: int ) -> Dict: param = dict() if input_index_value.has_value(): if isinstance(index, np.ndarray): filtered_index = pd_index[index] param["shape"] = len(filtered_index) param["index_value"] = parse_index(filtered_index, store_data=axis == 1) if axis == 1: param["dtypes"] = inp.dtypes[index] else: # tensor, cannot be indexer on axis 1 assert axis == 0 param["shape"] = np.nan param["index_value"] = parse_index( pd.Index([], dtype=pd_index.dtype), inp, index, store_data=False ) else: assert axis == 0 if isinstance(index, np.ndarray): param["shape"] = int(index.sum()) else: param["shape"] = np.nan param["index_value"] = parse_index(pd_index, inp, index, store_data=False) return param @classmethod def _calc_fancy_index_param( cls, input_index_value: IndexValue, pd_index: pd.Index, inp, index, axis: int ) -> Dict: param = dict() if input_index_value.has_value(): if isinstance(index, np.ndarray): if not pd_index.is_unique: assert axis == 1 # as there's no direct method in pandas to handle fancy indexes # we creates a empty new_dtypes = inp.dtypes.loc[index] param["shape"] = len(new_dtypes) param["index_value"] = parse_index( new_dtypes.index, store_data=True ) param["dtypes"] = new_dtypes else: for it in index: if it not in pd_index: axis_name = "index" if axis == 0 else "columns" raise KeyError( f"Label [{it}] not found in the [{axis_name}]" ) param["shape"] = len(index) param["index_value"] = parse_index(pd.Index(index), store_data=True) if axis == 1: param["dtypes"] = inp.dtypes[index] else: assert axis == 0 param["shape"] = index.shape[0] param["index_value"] = parse_index( pd.Index([], dtype=pd_index.dtype), inp, index ) else: assert axis == 0 param["shape"] = index.shape[0] param["index_value"] = parse_index(pd_index, inp, index) return param @classmethod def _calc_param(cls, inp, axis: int, index) -> Dict: input_index_value = inp.index_value if axis == 0 else inp.columns_value pd_index = input_index_value.to_pandas() if isinstance(index, slice): return cls._calc_slice_param(input_index_value, pd_index, inp, index, axis) elif hasattr(index, "dtype") and index.ndim == 1: if index.dtype == np.bool_: # bool indexing return cls._calc_bool_index_param( input_index_value, pd_index, inp, index, axis ) else: # fancy indexing return cls._calc_fancy_index_param( input_index_value, pd_index, inp, index, axis ) else: param = dict() if input_index_value.has_value(): loc = pd_index.get_loc(index) if isinstance(loc, (slice, np.ndarray)): assert axis == 1 new_dtypes = inp.dtypes[loc] param["shape"] = len(new_dtypes) param["index_value"] = parse_index( new_dtypes.index, store_data=True ) param["dtypes"] = new_dtypes else: # append None to indicate returning Series param["shape"] = None else: param["shape"] = None return param def __call__(self, inp): inputs = [inp] + filter_inputs(self.indexes) shape = [] sizes = [] index_value = columns_value = dtypes = None for ax, index in enumerate(self.indexes): param = self._calc_param(inp, ax, index) size = param.get("shape") sizes.append(size) if size is not None: shape.append(size) if ax == 0: index_value = param.get("index_value") else: columns_value = param.get("index_value") dtypes = param.get("dtypes") shape = tuple(shape) if len(shape) == 0: # scalar if isinstance(inp, DATAFRAME_TYPE): dtype = inp.dtypes[self.indexes[1]] else: dtype = inp.dtype return self.new_scalar(inputs, dtype=dtype) elif len(shape) == 1: # series if isinstance(inp, DATAFRAME_TYPE): if sizes[0] is None: # label on axis 0 dtype = find_common_type(list(dtypes)) return self.new_series( inputs, shape=shape, dtype=dtype, index_value=columns_value, name=self.indexes[0], ) else: # label on axis 1 dtype = inp.dtypes[self.indexes[1]] return self.new_series( inputs, shape=shape, dtype=dtype, index_value=index_value, name=self.indexes[1], ) else: return self.new_series( inputs, shape=shape, dtype=inp.dtype, index_value=index_value, name=inp.name, ) else: # dataframe return self.new_dataframe( inputs, shape=shape, dtypes=dtypes, index_value=index_value, columns_value=columns_value, ) def loc(a): return DataFrameLoc(a)