core/maxframe/dataframe/arrays.py (658 lines of code) (raw):

# Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import itertools import operator import re from copy import copy as copy_obj from numbers import Integral from typing import Sequence, Type import numpy as np import pandas as pd from pandas._libs import lib from pandas.api.extensions import ( ExtensionArray, ExtensionDtype, register_extension_dtype, ) from pandas.api.indexers import check_array_indexer from pandas.api.types import ( is_array_like, is_list_like, is_scalar, is_string_dtype, pandas_dtype, ) from pandas.arrays import StringArray as StringArrayBase from pandas.compat import set_function_name from pandas.core import ops from pandas.core.algorithms import take try: from pandas._libs.arrays import NDArrayBacked except ImportError: NDArrayBacked = None try: import pyarrow as pa pa_null = pa.NULL except ImportError: # pragma: no cover pa = None pa_null = None try: import pyarrow.compute as pc except ImportError: # pragma: no cover pc = None from ..config import options from ..core import is_kernel_mode from ..utils import pd_release_version, tokenize _use_bool_any_all = pd_release_version[:2] >= (1, 3) _use_extension_index = pd_release_version[:2] >= (1, 4) _object_engine_for_string_array = pd_release_version[:2] >= (1, 5) if _object_engine_for_string_array: StringArrayBase = type(StringArrayBase)( "StringArrayBase", StringArrayBase.__bases__, dict(StringArrayBase.__dict__) ) class ArrowDtype(ExtensionDtype): @property def arrow_type(self): # pragma: no cover raise NotImplementedError def __from_arrow__(self, array): return self.construct_array_type()(array) @register_extension_dtype class ArrowStringDtype(ArrowDtype): """ Extension dtype for arrow string data. .. warning:: ArrowStringDtype is considered experimental. The implementation and parts of the API may change without warning. In particular, ArrowStringDtype.na_value may change to no longer be ``numpy.nan``. Attributes ---------- None Methods ------- None Examples -------- >>> import maxframe.dataframe as md >>> md.ArrowStringDtype() ArrowStringDtype """ type = str kind = "U" name = "Arrow[string]" na_value = pa_null @classmethod def construct_from_string(cls, string): if string == cls.name: return cls() else: raise TypeError(f"Cannot construct a '{cls}' from '{string}'") @classmethod def construct_array_type(cls) -> "Type[ArrowStringArray]": return ArrowStringArray @property def arrow_type(self): return pa.string() @register_extension_dtype class ArrowStringDtypeAlias(ArrowStringDtype): name = "arrow_string" # register an alias name for compatibility class ArrowListDtypeType(type): """ the type of ArrowListDtype, this metaclass determines subclass ability """ pass class ArrowListDtype(ArrowDtype): _metadata = ("_value_type",) def __init__(self, dtype): if isinstance(dtype, type(self)): dtype = dtype.value_type if pa and isinstance(dtype, pa.DataType): dtype = dtype.to_pandas_dtype() dtype = pandas_dtype(dtype) if is_string_dtype(dtype) and not isinstance(dtype, ArrowStringDtype): # convert string dtype to arrow string dtype dtype = ArrowStringDtype() self._value_type = dtype @property def value_type(self): return self._value_type @property def kind(self): return "O" @property def type(self): return ArrowListDtypeType @property def name(self): return f"Arrow[List[{self.value_type.name}]]" @property def arrow_type(self): if isinstance(self._value_type, ArrowDtype): arrow_subdtype = self._value_type.arrow_type else: arrow_subdtype = pa.from_numpy_dtype(self._value_type) return pa.list_(arrow_subdtype) def __repr__(self) -> str: return self.name @classmethod def construct_array_type(cls) -> "Type[ArrowListArray]": return ArrowListArray @classmethod def construct_from_string(cls, string): msg = f"Cannot construct a 'ArrowListDtype' from '{string}'" xpr = re.compile(r"Arrow\[List\[(?P<value_type>[^,]*)\]\]$") m = xpr.match(string) if m: value_type = m.groupdict()["value_type"] return ArrowListDtype(value_type) else: raise TypeError(msg) @classmethod def is_dtype(cls, dtype) -> bool: dtype = getattr(dtype, "dtype", dtype) if isinstance(dtype, str): try: cls.construct_from_string(dtype) except TypeError: return False else: return True else: return isinstance(dtype, cls) def __hash__(self): return super().__hash__() def __eq__(self, other): if not isinstance(other, ArrowListDtype): return False value_type = self._value_type other_value_type = other._value_type try: return value_type == other_value_type except TypeError: # cannot compare numpy dtype and extension dtype return other_value_type == value_type class ArrowArray(ExtensionArray): _arrow_type = None def __init__(self, values, dtype: ArrowDtype = None, copy=False): pandas_only = self._pandas_only() if pa is not None and not pandas_only: self._init_by_arrow(values, dtype=dtype, copy=copy) elif not is_kernel_mode(): # not in kernel mode, allow to use numpy handle data # just for infer dtypes purpose self._init_by_numpy(values, dtype=dtype, copy=copy) else: raise ImportError("Cannot create ArrowArray when `pyarrow` not installed") # for test purpose self._force_use_pandas = pandas_only def _init_by_arrow(self, values, dtype: ArrowDtype = None, copy=False): if isinstance(values, (pd.Index, pd.Series)): # for pandas Index and Series, # convert to PandasArray values = values.array if isinstance(values, type(self)): arrow_array = values._arrow_array elif isinstance(values, ExtensionArray): # if come from pandas object like index, # convert to pandas StringArray first, # validation will be done in construct arrow_array = pa.chunked_array([pa.array(values, from_pandas=True)]) elif isinstance(values, pa.ChunkedArray): arrow_array = values elif isinstance(values, pa.Array): arrow_array = pa.chunked_array([values]) elif len(values) == 0: # pragma: no cover arrow_array = pa.chunked_array([pa.array([], type=dtype.arrow_type)]) else: arrow_array = pa.chunked_array([pa.array(values, type=dtype.arrow_type)]) if copy: arrow_array = copy_obj(arrow_array) self._use_arrow = True self._arrow_array = arrow_array if NDArrayBacked is not None and isinstance(self, NDArrayBacked): NDArrayBacked.__init__(self, np.array([]), dtype) else: self._dtype = dtype def _init_by_numpy(self, values, dtype: ArrowDtype = None, copy=False): self._use_arrow = False ndarray = np.array(values, copy=copy) if NDArrayBacked is not None and isinstance(self, NDArrayBacked): NDArrayBacked.__init__(self, ndarray, dtype) else: self._dtype = dtype self._ndarray = np.array(values, copy=copy) @classmethod def _pandas_only(cls): return options.dataframe.arrow_array.pandas_only def __repr__(self): return f"{type(self).__name__}({repr(self._array)})" @property def _array(self): return self._arrow_array if self._use_arrow else self._ndarray @property def dtype(self) -> "Type[ArrowDtype]": return self._dtype @property def nbytes(self) -> int: if self._use_arrow: return sum( x.size for chunk in self._arrow_array.chunks for x in chunk.buffers() if x is not None ) else: return self._ndarray.nbytes @property def shape(self): if self._use_arrow: return (self._arrow_array.length(),) else: return self._ndarray.shape def memory_usage(self, deep=True) -> int: if self._use_arrow: return self.nbytes else: return pd.Series(self._ndarray).memory_usage(index=False, deep=deep) @classmethod def _to_arrow_array(cls, scalars): return pa.array(scalars) @classmethod def _from_sequence(cls, scalars, dtype=None, copy=False): if pa is None or cls._pandas_only(): # pyarrow not installed, just return numpy ret = np.empty(len(scalars), dtype=object) ret[:] = scalars return cls(ret) if pa_null is not None and isinstance(scalars, type(pa_null)): scalars = [] elif not hasattr(scalars, "dtype"): ret = np.empty(len(scalars), dtype=object) for i, s in enumerate(scalars): ret[i] = s scalars = ret elif isinstance(scalars, cls): if copy: scalars = scalars.copy() return scalars arrow_array = pa.chunked_array([cls._to_arrow_array(scalars)]) return cls(arrow_array, dtype=dtype, copy=copy) @classmethod def _from_sequence_of_strings(cls, strings, dtype=None, copy=False): return cls._from_sequence(strings, dtype=dtype, copy=copy) @staticmethod def _can_process_slice_via_arrow(slc): if not isinstance(slc, slice): return False if slc.step is not None and slc.step != 1: return False if slc.start is not None and not isinstance( slc.start, Integral ): # pragma: no cover return False if slc.stop is not None and not isinstance( slc.stop, Integral ): # pragma: no cover return False return True def _values_for_factorize(self): arr = self.to_numpy() mask = self.isna() arr[mask] = -1 return arr, -1 def _values_for_argsort(self): return self.to_numpy() @classmethod def _from_factorized(cls, values, original): return cls(values) @staticmethod def _process_pos(pos, length, is_start): if pos is None: return 0 if is_start else length return pos + length if pos < 0 else pos @classmethod def _post_scalar_getitem(cls, lst): return lst.to_pandas()[0] def __getitem__(self, item): cls = type(self) if pa is None or self._force_use_pandas: # pyarrow not installed result = self._ndarray[item] if pd.api.types.is_scalar(item): return result else: return type(self)(result) has_take = hasattr(self._arrow_array, "take") if not self._force_use_pandas and has_take: if pd.api.types.is_scalar(item): item = item + len(self) if item < 0 else item return self._post_scalar_getitem(self._arrow_array.take([item])) elif self._can_process_slice_via_arrow(item): length = len(self) start, stop = item.start, item.stop start = self._process_pos(start, length, True) stop = self._process_pos(stop, length, False) return cls( self._arrow_array.slice(offset=start, length=stop - start), dtype=self._dtype, ) elif hasattr(item, "dtype") and np.issubdtype(item.dtype, np.bool_): return cls( self._arrow_array.filter(pa.array(item, from_pandas=True)), dtype=self._dtype, ) elif hasattr(item, "dtype"): length = len(self) item = np.where(item < 0, item + length, item) return cls(self._arrow_array.take(item), dtype=self._dtype) array = np.asarray(self._arrow_array.to_pandas()) return cls(array[item], dtype=self._dtype) @classmethod def _concat_same_type(cls, to_concat: Sequence["ArrowArray"]) -> "ArrowArray": if pa is None or cls._pandas_only(): # pyarrow not installed return cls(np.concatenate([x._array for x in to_concat])) chunks = list( itertools.chain.from_iterable(x._arrow_array.chunks for x in to_concat) ) if len(chunks) == 0: chunks = [pa.array([], type=to_concat[0].dtype.arrow_type)] return cls(pa.chunked_array(chunks)) def __len__(self): return len(self._array) def __array__(self, dtype=None): return self.to_numpy(dtype=dtype) def to_numpy(self, dtype=None, copy=False, na_value=lib.no_default): if self._use_arrow: array = np.asarray(self._arrow_array.to_pandas()) else: array = self._ndarray if copy or na_value is not lib.no_default: array = array.copy() if na_value is not lib.no_default: array[self.isna()] = na_value return array @classmethod def _array_fillna(cls, array, value): return array.fillna(value) def fillna(self, value=None, method=None, limit=None): cls = type(self) if pa is None or self._force_use_pandas: # pyarrow not installed return cls( pd.Series(self.to_numpy()).fillna( value=value, method=method, limit=limit ) ) chunks = [] for chunk_array in self._arrow_array.chunks: array = chunk_array.to_pandas() if method is None: result_array = self._array_fillna(array, value) else: result_array = array.fillna(value=value, method=method, limit=limit) chunks.append(pa.array(result_array, from_pandas=True)) return cls(pa.chunked_array(chunks), dtype=self._dtype) def astype(self, dtype, copy=True): dtype = pandas_dtype(dtype) if isinstance(dtype, ArrowStringDtype): if copy: return self.copy() return self if pa is None or self._force_use_pandas: # pyarrow not installed if isinstance(dtype, ArrowDtype): dtype = dtype.type return type(self)(pd.Series(self.to_numpy()).astype(dtype, copy=copy)) # try to slice 1 record to get the result dtype test_array = self._arrow_array.slice(0, 1).to_pandas() test_result_array = test_array.astype(dtype).array if _use_extension_index: test_result_type = type(test_array.astype(dtype).values) if test_result_type is np.ndarray: test_result_type = np.array else: test_result_type = type(test_result_array) result_array = test_result_type( np.full( self.shape, test_result_array.dtype.na_value, dtype=np.asarray(test_result_array).dtype, ) ) start = 0 # use chunks to do astype for chunk_array in self._arrow_array.chunks: result_array[start : start + len(chunk_array)] = ( chunk_array.to_pandas().astype(dtype).array ) start += len(chunk_array) return result_array def isna(self): if ( not self._force_use_pandas and self._use_arrow and hasattr(self._arrow_array, "is_null") ): return self._arrow_array.is_null().to_pandas().to_numpy() elif self._use_arrow: return pd.isna(self._arrow_array.to_pandas()).to_numpy() else: return pd.isna(self._ndarray) def take(self, indices, allow_fill=False, fill_value=None): if ( allow_fill is False or (allow_fill and fill_value is self.dtype.na_value) ) and len(self) > 0: return type(self)(self[indices], dtype=self._dtype) if self._use_arrow: array = self._arrow_array.to_pandas().to_numpy() else: array = self._ndarray replace = False if allow_fill and (fill_value is None or fill_value == self._dtype.na_value): fill_value = self.dtype.na_value replace = True result = take(array, indices, fill_value=fill_value, allow_fill=allow_fill) del array if replace and pa is not None: # pyarrow cannot recognize pa.NULL result[result == self.dtype.na_value] = None return type(self)(result, dtype=self._dtype) def copy(self): if self._use_arrow: return type(self)(copy_obj(self._arrow_array)) else: return type(self)(self._ndarray.copy()) def unique(self): if self._force_use_pandas or not self._use_arrow or not hasattr(pc, "unique"): return type(self)(np.unique(self.to_numpy()), dtype=self._dtype) return type(self)(pc.unique(self._arrow_array), dtype=self._dtype) def value_counts(self, dropna=False): if self._use_arrow: series = self._arrow_array.to_pandas() else: series = pd.Series(self._ndarray) return type(self)(series.value_counts(dropna=dropna), dtype=self._dtype) if _use_bool_any_all: def any(self, axis=0, out=None): return self.to_numpy().astype(bool).any(axis=axis, out=out) def all(self, axis=0, out=None): return self.to_numpy().astype(bool).all(axis=axis, out=out) else: def any(self, axis=0, out=None): return self.to_numpy().any(axis=axis, out=out) def all(self, axis=0, out=None): return self.to_numpy().all(axis=axis, out=out) def __maxframe_tokenize__(self): if self._use_arrow: return tokenize( [ memoryview(x) for chunk in self._arrow_array.chunks for x in chunk.buffers() if x is not None ] ) else: return self._ndarray class ArrowStringArray(ArrowArray, StringArrayBase): def __init__(self, values, dtype=None, copy=False): if dtype is not None: assert isinstance(dtype, ArrowStringDtype) ArrowArray.__init__(self, values, ArrowStringDtype(), copy=copy) @classmethod def from_scalars(cls, values): if pa is None or cls._pandas_only(): return cls._from_sequence(values) else: arrow_array = pa.chunked_array([cls._to_arrow_array(values)]) return cls(arrow_array) @classmethod def _to_arrow_array(cls, scalars): return pa.array(scalars).cast(pa.string()) def __setitem__(self, key, value): if isinstance(value, (pd.Index, pd.Series)): value = value.to_numpy() if isinstance(value, type(self)): value = value.to_numpy() key = check_array_indexer(self, key) scalar_key = is_scalar(key) scalar_value = is_scalar(value) if scalar_key and not scalar_value: raise ValueError("setting an array element with a sequence.") # validate new items if scalar_value: if pd.isna(value): value = None elif not isinstance(value, str): raise ValueError( f"Cannot set non-string value '{value}' into a ArrowStringArray." ) else: if not is_array_like(value): value = np.asarray(value, dtype=object) if len(value) and not lib.is_string_array(value, skipna=True): raise ValueError("Must provide strings.") if self._use_arrow: string_array = np.asarray(self._arrow_array.to_pandas()) string_array[key] = value self._arrow_array = pa.chunked_array([pa.array(string_array)]) else: self._ndarray[key] = value # Override parent because we have different return types. @classmethod def _create_arithmetic_method(cls, op): # Note: this handles both arithmetic and comparison methods. def method(self, other): is_arithmetic = True if op.__name__ in ops.ARITHMETIC_BINOPS else False pandas_only = cls._pandas_only() is_other_array = False if not is_scalar(other): is_other_array = True other = np.asarray(other) self_is_na = self.isna() other_is_na = pd.isna(other) mask = self_is_na | other_is_na if pa is None or pandas_only: if is_arithmetic: ret = np.empty(self.shape, dtype=object) else: ret = np.zeros(self.shape, dtype=bool) valid = ~mask arr = ( self._arrow_array.to_pandas().to_numpy() if self._use_arrow else self._ndarray ) o = other[valid] if is_other_array else other ret[valid] = op(arr[valid], o) if is_arithmetic: return ArrowStringArray(ret) else: return pd.arrays.BooleanArray(ret, mask) chunks = [] mask_chunks = [] start = 0 for chunk_array in self._arrow_array.chunks: chunk_array = np.asarray(chunk_array.to_pandas()) end = start + len(chunk_array) chunk_mask = mask[start:end] chunk_valid = ~chunk_mask if is_arithmetic: result = np.empty(chunk_array.shape, dtype=object) else: result = np.zeros(chunk_array.shape, dtype=bool) chunk_other = other if is_other_array: chunk_other = other[start:end] chunk_other = chunk_other[chunk_valid] # calculate only for both not None result[chunk_valid] = op(chunk_array[chunk_valid], chunk_other) if is_arithmetic: chunks.append(pa.array(result, type=pa.string(), from_pandas=True)) else: chunks.append(result) mask_chunks.append(chunk_mask) if is_arithmetic: return ArrowStringArray(pa.chunked_array(chunks)) else: return pd.arrays.BooleanArray( np.concatenate(chunks), np.concatenate(mask_chunks) ) return set_function_name(method, f"__{op.__name__}__", cls) def shift(self, periods: int = 1, fill_value: object = None) -> "ArrowStringArray": return ExtensionArray.shift(self, periods=periods, fill_value=fill_value) @classmethod def _add_arithmetic_ops(cls): cls.__add__ = cls._create_arithmetic_method(operator.add) cls.__radd__ = cls._create_arithmetic_method(ops.radd) cls.__mul__ = cls._create_arithmetic_method(operator.mul) cls.__rmul__ = cls._create_arithmetic_method(ops.rmul) @classmethod def _add_comparison_ops(cls): cls.__eq__ = cls._create_comparison_method(operator.eq) cls.__ne__ = cls._create_comparison_method(operator.ne) cls.__lt__ = cls._create_comparison_method(operator.lt) cls.__gt__ = cls._create_comparison_method(operator.gt) cls.__le__ = cls._create_comparison_method(operator.le) cls.__ge__ = cls._create_comparison_method(operator.ge) _create_comparison_method = _create_arithmetic_method ArrowStringArray._add_arithmetic_ops() ArrowStringArray._add_comparison_ops() class ArrowListArray(ArrowArray): def __init__(self, values, dtype: ArrowListDtype = None, copy=False): if dtype is None: if isinstance(values, type(self)): dtype = values.dtype elif pa is not None: if isinstance(values, pa.Array): dtype = ArrowListDtype(values.type.value_type) elif isinstance(values, pa.ChunkedArray): dtype = ArrowListDtype(values.type.value_type) else: values = pa.array(values) if values.type == pa.null(): dtype = ArrowListDtype(pa.string()) else: dtype = ArrowListDtype(values.type.value_type) else: value_type = np.asarray(values[0]).dtype dtype = ArrowListDtype(value_type) super().__init__(values, dtype=dtype, copy=copy) def to_numpy(self, dtype=None, copy=False, na_value=lib.no_default): if self._use_arrow: s = self._arrow_array.to_pandas() else: s = pd.Series(self._ndarray) s = s.map(lambda x: x.tolist() if hasattr(x, "tolist") else x) if copy or na_value is not lib.no_default: s = s.copy() if na_value is not lib.no_default: s[self.isna()] = na_value return np.asarray(s) @classmethod def _post_scalar_getitem(cls, lst): return lst[0].as_py() def __setitem__(self, key, value): if isinstance(value, (pd.Index, pd.Series)): value = value.to_numpy() key = check_array_indexer(self, key) scalar_key = is_scalar(key) # validate new items if scalar_key: if pd.isna(value): value = None elif not is_list_like(value): raise ValueError("Must provide list.") if self._use_arrow: array = np.asarray(self._arrow_array.to_pandas()) array[key] = value self._arrow_array = pa.chunked_array( [pa.array(array, type=self.dtype.arrow_type)] ) else: self._ndarray[key] = value @classmethod def _array_fillna(cls, series, value): # cannot fillna directly, because value is a list-like object return series.apply(lambda x: x if is_list_like(x) or not pd.isna(x) else value) def astype(self, dtype, copy=True): msg = f"cannot astype from {self.dtype} to {dtype}" dtype = pandas_dtype(dtype) if isinstance(dtype, ArrowListDtype): if self.dtype == dtype: if copy: return self.copy() return self else: if self._use_arrow: try: arrow_array = self._arrow_array.cast(dtype.arrow_type) return ArrowListArray(arrow_array) except (NotImplementedError, pa.ArrowInvalid): raise TypeError(msg) else: def f(x): return pd.Series(x).astype(dtype.value_type.type).tolist() try: arr = pd.Series(self._ndarray) ret = arr.map(f).to_numpy() return ArrowStringArray(ret) except ValueError: raise TypeError(msg) try: return super().astype(dtype, copy=copy) except ValueError: raise TypeError(msg)