core/maxframe/dataframe/utils.py (929 lines of code) (raw):
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import inspect
import itertools
import logging
import operator
import sys
from contextlib import contextmanager
from numbers import Integral
from typing import TYPE_CHECKING, Any, Callable, List
import numpy as np
import pandas as pd
from pandas.api.extensions import ExtensionDtype
from pandas.api.types import is_string_dtype
from pandas.core.dtypes.inference import is_dict_like, is_list_like
from ..core import Entity, ExecutableTuple
from ..lib.mmh3 import hash as mmh_hash
from ..udf import MarkedFunction
from ..utils import (
ModulePlaceholder,
is_full_slice,
lazy_import,
parse_version,
sbytes,
tokenize,
)
try:
import pyarrow as pa
except ImportError: # pragma: no cover
pa = ModulePlaceholder("pyarrow")
if TYPE_CHECKING:
from .operators import DataFrameOperator
cudf = lazy_import("cudf", rename="cudf")
vineyard = lazy_import("vineyard")
try:
import ray
ray_release_version = parse_version(ray.__version__).release
ray_deprecate_ml_dataset = ray_release_version[:2] >= (2, 0)
except ImportError:
ray_release_version = None
ray_deprecate_ml_dataset = None
logger = logging.getLogger(__name__)
try:
from pandas import ArrowDtype
except ImportError:
ArrowDtype = None
def hash_index(index, size):
def func(x, size):
return mmh_hash(sbytes(x)) % size
f = functools.partial(func, size=size)
idx_to_grouped = index.groupby(index.map(f))
return [idx_to_grouped.get(i, list()) for i in range(size)]
def hash_dataframe_on(df, on, size, level=None):
if on is None:
idx = df.index
if level is not None:
idx = idx.to_frame(False)[level]
if cudf and isinstance(idx, cudf.Index): # pragma: no cover
idx = idx.to_pandas()
hashed_label = pd.util.hash_pandas_object(idx, categorize=False)
elif callable(on):
# todo optimization can be added, if ``on`` is a numpy ufunc or sth can be vectorized
hashed_label = pd.util.hash_pandas_object(df.index.map(on), categorize=False)
else:
if isinstance(on, list):
to_concat = []
for v in on:
if isinstance(v, pd.Series):
to_concat.append(v)
else:
to_concat.append(df[v])
data = pd.concat(to_concat, axis=1)
else:
data = df[on]
hashed_label = pd.util.hash_pandas_object(data, index=False, categorize=False)
idx_to_grouped = pd.RangeIndex(0, len(hashed_label)).groupby(hashed_label % size)
return [idx_to_grouped.get(i, pd.Index([])) for i in range(size)]
def hash_dtypes(dtypes, size):
hashed_indexes = hash_index(dtypes.index, size)
return [dtypes[index] for index in hashed_indexes]
def sort_dataframe_inplace(df, *axis):
for ax in axis:
df.sort_index(axis=ax, inplace=True)
return df
@functools.lru_cache(1)
def _get_range_index_type():
if cudf is not None:
return pd.RangeIndex, cudf.RangeIndex
else:
return pd.RangeIndex
@functools.lru_cache(1)
def _get_multi_index_type():
if cudf is not None:
return pd.MultiIndex, cudf.MultiIndex
else:
return pd.MultiIndex
def _get_range_index_start(pd_range_index):
try:
return pd_range_index.start
except AttributeError: # pragma: no cover
return pd_range_index._start
def _get_range_index_stop(pd_range_index):
try:
return pd_range_index.stop
except AttributeError: # pragma: no cover
return pd_range_index._stop
def _get_range_index_step(pd_range_index):
try:
return pd_range_index.step
except AttributeError: # pragma: no cover
pass
try: # pragma: no cover
return pd_range_index._step
except AttributeError: # pragma: no cover
return 1 # cudf does not support step arg
def is_pd_range_empty(pd_range_index):
start, stop, step = (
_get_range_index_start(pd_range_index),
_get_range_index_stop(pd_range_index),
_get_range_index_step(pd_range_index),
)
return (start >= stop and step >= 0) or (start <= stop and step < 0)
def parse_index(index_value, *args, store_data=False, key=None):
from .core import IndexValue
def _extract_property(index, tp, ret_data):
kw = {
"_min_val": _get_index_min(index),
"_max_val": _get_index_max(index),
"_min_val_close": True,
"_max_val_close": True,
"_key": key or _tokenize_index(index, *args),
}
if ret_data:
kw["_data"] = index.values
for field in tp._FIELDS:
if field in kw or field == "_data":
continue
val = getattr(index, field.lstrip("_"), None)
if val is not None:
kw[field] = val
return kw
def _tokenize_index(index, *token_objects):
if not index.empty:
return tokenize(index)
else:
return tokenize(index, *token_objects)
def _get_index_min(index):
try:
return index.min()
except (ValueError, AttributeError):
if isinstance(index, pd.IntervalIndex):
return None
raise
except TypeError:
return None
def _get_index_max(index):
try:
return index.max()
except (ValueError, AttributeError):
if isinstance(index, pd.IntervalIndex):
return None
raise
except TypeError:
return None
def _serialize_index(index):
tp = getattr(IndexValue, type(index).__name__)
properties = _extract_property(index, tp, store_data)
properties["_name"] = index.name
return tp(**properties)
def _serialize_range_index(index):
if is_pd_range_empty(index):
properties = {
"_is_monotonic_increasing": True,
"_is_monotonic_decreasing": False,
"_is_unique": True,
"_min_val": _get_index_min(index),
"_max_val": _get_index_max(index),
"_min_val_close": True,
"_max_val_close": False,
"_key": key or _tokenize_index(index, *args),
"_name": index.name,
"_dtype": index.dtype,
}
else:
properties = _extract_property(index, IndexValue.RangeIndex, False)
return IndexValue.RangeIndex(
_slice=slice(
_get_range_index_start(index),
_get_range_index_stop(index),
_get_range_index_step(index),
),
**properties,
)
def _serialize_multi_index(index):
kw = _extract_property(index, IndexValue.MultiIndex, store_data)
kw["_sortorder"] = index.sortorder
kw["_dtypes"] = [lev.dtype for lev in index.levels]
return IndexValue.MultiIndex(**kw)
if index_value is None:
return IndexValue(
_index_value=IndexValue.Index(
_is_monotonic_increasing=False,
_is_monotonic_decreasing=False,
_is_unique=False,
_min_val=None,
_max_val=None,
_min_val_close=True,
_max_val_close=True,
_key=key or tokenize(*args),
)
)
if hasattr(index_value, "to_pandas"): # pragma: no cover
# convert cudf.Index to pandas
index_value = index_value.to_pandas()
if isinstance(index_value, _get_range_index_type()):
return IndexValue(_index_value=_serialize_range_index(index_value))
elif isinstance(index_value, _get_multi_index_type()):
return IndexValue(_index_value=_serialize_multi_index(index_value))
else:
return IndexValue(_index_value=_serialize_index(index_value))
def gen_unknown_index_value(index_value, *args, normalize_range_index=False):
"""
Generate new index value with the same likes of given index_value and args, but without any value.
Parameters
----------
index_value
Given index value.
args
Arguments for parse_index.
normalize_range_index
If normalize range index to normal index.
Returns
-------
New created range index value.
"""
pd_index = index_value.to_pandas()
if not normalize_range_index and isinstance(pd_index, pd.RangeIndex):
return parse_index(pd.RangeIndex(-1, name=pd_index.name), *args)
elif not isinstance(pd_index, pd.MultiIndex):
return parse_index(
pd.Index([], dtype=pd_index.dtype, name=pd_index.name), *args
)
else:
i = pd.MultiIndex.from_arrays(
[c[:0] for c in pd_index.levels], names=pd_index.names
)
return parse_index(i, *args)
def split_monotonic_index_min_max(
left_min_max, left_increase, right_min_max, right_increase
):
"""
Split the original two min_max into new min_max. Each min_max should be a list
in which each item should be a 4-tuple indicates that this chunk's min value,
whether the min value is close, the max value, and whether the max value is close.
The return value would be a nested list, each item is a list
indicates that how this chunk should be split into.
:param left_min_max: the left min_max
:param left_increase: if the original data of left is increased
:param right_min_max: the right min_max
:param right_increase: if the original data of right is increased
:return: nested list in which each item indicates how min_max is split
>>> left_min_max = [(0, True, 3, True), (4, True, 8, True), (12, True, 18, True),
... (20, True, 22, True)]
>>> right_min_max = [(2, True, 6, True), (7, True, 9, True), (10, True, 14, True),
... (18, True, 19, True)]
>>> l, r = split_monotonic_index_min_max(left_min_max, True, right_min_max, True)
>>> l
[[(0, True, 2, False), (2, True, 3, True)], [(3, False, 4, False), (4, True, 6, True), (6, False, 7, False),
(7, True, 8, True)], [(8, False, 9, True), (10, True, 12, False), (12, True, 14, True), (14, False, 18, False),
(18, True, 18, True)], [(18, False, 19, True), [20, True, 22, True]]]
>>> r
[[(0, True, 2, False), (2, True, 3, True), (3, False, 4, False), (4, True, 6, True)],
[(6, False, 7, False), (7, True, 8, True), (8, False, 9, True)], [(10, True, 12, False), (12, True, 14, True)],
[(14, False, 18, False), (18, True, 18, True), (18, False, 19, True), [20, True, 22, True]]]
"""
left_idx_to_min_max = [[] for _ in left_min_max]
right_idx_to_min_max = [[] for _ in right_min_max]
left_curr_min_max = list(left_min_max[0])
right_curr_min_max = list(right_min_max[0])
left_curr_idx = right_curr_idx = 0
left_terminate = right_terminate = False
while not left_terminate or not right_terminate:
if left_terminate:
left_idx_to_min_max[left_curr_idx].append(tuple(right_curr_min_max))
right_idx_to_min_max[right_curr_idx].append(tuple(right_curr_min_max))
if right_curr_idx + 1 >= len(right_min_max):
right_terminate = True
else:
right_curr_idx += 1
right_curr_min_max = list(right_min_max[right_curr_idx])
elif right_terminate:
right_idx_to_min_max[right_curr_idx].append(tuple(left_curr_min_max))
left_idx_to_min_max[left_curr_idx].append(tuple(left_curr_min_max))
if left_curr_idx + 1 >= len(left_min_max):
left_terminate = True
else:
left_curr_idx += 1
left_curr_min_max = list(left_min_max[left_curr_idx])
elif left_curr_min_max[0] < right_curr_min_max[0]:
# left min < right min
right_min = [right_curr_min_max[0], not right_curr_min_max[1]]
max_val = min(left_curr_min_max[2:], right_min)
assert len(max_val) == 2
min_max = (
left_curr_min_max[0],
left_curr_min_max[1],
max_val[0],
max_val[1],
)
left_idx_to_min_max[left_curr_idx].append(min_max)
right_idx_to_min_max[right_curr_idx].append(min_max)
if left_curr_min_max[2:] == max_val:
# left max < right min
if left_curr_idx + 1 >= len(left_min_max):
left_terminate = True
else:
left_curr_idx += 1
left_curr_min_max = list(left_min_max[left_curr_idx])
else:
# from left min(left min close) to right min(exclude right min close)
left_curr_min_max[:2] = right_curr_min_max[:2]
elif left_curr_min_max[0] > right_curr_min_max[0]:
# left min > right min
left_min = [left_curr_min_max[0], not left_curr_min_max[1]]
max_val = min(right_curr_min_max[2:], left_min)
min_max = (
right_curr_min_max[0],
right_curr_min_max[1],
max_val[0],
max_val[1],
)
left_idx_to_min_max[left_curr_idx].append(min_max)
right_idx_to_min_max[right_curr_idx].append(min_max)
if right_curr_min_max[2:] == max_val:
# right max < left min
if right_curr_idx + 1 >= len(right_min_max):
right_terminate = True
else:
right_curr_idx += 1
right_curr_min_max = list(right_min_max[right_curr_idx])
else:
# from left min(left min close) to right min(exclude right min close)
right_curr_min_max[:2] = left_curr_min_max[:2]
else:
# left min == right min
max_val = min(left_curr_min_max[2:], right_curr_min_max[2:])
assert len(max_val) == 2
min_max = (
left_curr_min_max[0],
left_curr_min_max[1],
max_val[0],
max_val[1],
)
left_idx_to_min_max[left_curr_idx].append(min_max)
right_idx_to_min_max[right_curr_idx].append(min_max)
if max_val == left_curr_min_max[2:]:
if left_curr_idx + 1 >= len(left_min_max):
left_terminate = True
else:
left_curr_idx += 1
left_curr_min_max = list(left_min_max[left_curr_idx])
else:
left_curr_min_max[:2] = max_val[0], not max_val[1]
if max_val == right_curr_min_max[2:]:
if right_curr_idx + 1 >= len(right_min_max):
right_terminate = True
else:
right_curr_idx += 1
right_curr_min_max = list(right_min_max[right_curr_idx])
else:
right_curr_min_max[:2] = max_val[0], not max_val[1]
if left_increase is False:
left_idx_to_min_max = list(reversed(left_idx_to_min_max))
if right_increase is False:
right_idx_to_min_max = list(reversed(right_idx_to_min_max))
return left_idx_to_min_max, right_idx_to_min_max
def build_split_idx_to_origin_idx(splits, increase=True):
# splits' len is equal to the original chunk size on a specified axis,
# splits is sth like [[(0, True, 2, True), (2, False, 3, True)]]
# which means there is one input chunk, and will be split into 2 out chunks
# in this function, we want to build a new dict from the out chunk index to
# the original chunk index and the inner position, like {0: (0, 0), 1: (0, 1)}
if increase is False:
splits = list(reversed(splits))
out_idx = itertools.count(0)
res = dict()
for origin_idx, _ in enumerate(splits):
for pos in range(len(splits[origin_idx])):
if increase is False:
o_idx = len(splits) - origin_idx - 1
else:
o_idx = origin_idx
res[next(out_idx)] = o_idx, pos
return res
def _generate_value(dtype, fill_value):
if ArrowDtype and isinstance(dtype, pd.ArrowDtype):
return _generate_value(dtype.pyarrow_dtype, fill_value)
if isinstance(dtype, pa.ListType):
return [_generate_value(dtype.value_type, fill_value)]
if isinstance(dtype, pa.MapType):
return [
(
_generate_value(dtype.key_type, fill_value),
_generate_value(dtype.item_type, fill_value),
)
]
if isinstance(dtype, pa.DataType):
return _generate_value(dtype.to_pandas_dtype(), fill_value)
# special handle for datetime64 and timedelta64
dispatch = {
np.datetime64: pd.Timestamp,
np.timedelta64: pd.Timedelta,
pd.CategoricalDtype.type: lambda x: pd.CategoricalDtype([x]),
# for object, we do not know the actual dtype,
# just convert to str for common usage
np.object_: lambda x: str(fill_value),
}
# otherwise, just use dtype.type itself to convert
target_dtype = getattr(dtype, "type", dtype)
convert = dispatch.get(target_dtype, target_dtype)
return convert(fill_value)
def build_empty_df(dtypes, index=None):
columns = dtypes.index
length = len(index) if index is not None else 0
record = [[_generate_value(dtype, 1) for dtype in dtypes]] * max(1, length)
# duplicate column may exist,
# so use RangeIndex first
df = pd.DataFrame(record, columns=range(len(dtypes)), index=index)
for i, dtype in enumerate(dtypes):
s = df.iloc[:, i]
if not pd.api.types.is_dtype_equal(s.dtype, dtype):
df.iloc[:, i] = s.astype(dtype)
df.columns = columns
return df[:length] if len(df) > length else df
def build_df(df_obj, fill_value=1, size=1, ensure_string=False):
dfs = []
if not isinstance(size, (list, tuple)):
sizes = [size]
else:
sizes = size
if not isinstance(fill_value, (list, tuple)):
fill_values = [fill_value]
else:
fill_values = fill_value
from .core import INDEX_TYPE, SERIES_TYPE
dtypes = (
pd.Series([df_obj.dtype], index=[df_obj.name])
if isinstance(df_obj, (INDEX_TYPE, SERIES_TYPE))
else df_obj.dtypes
)
for size, fill_value in zip(sizes, fill_values):
record = [[_generate_value(dtype, fill_value) for dtype in dtypes]] * size
df = pd.DataFrame(record)
df.columns = dtypes.index
if len(record) != 0: # columns is empty in some cases
target_index = df_obj.index_value.to_pandas()
if isinstance(target_index, pd.MultiIndex):
index_val = tuple(
_generate_value(level.dtype, fill_value)
for level in target_index.levels
)
df.index = pd.MultiIndex.from_tuples(
[index_val] * size, names=target_index.names
)
else:
index_val = _generate_value(target_index.dtype, fill_value)
df.index = pd.Index([index_val] * size, name=target_index.name)
# make sure dtypes correct
for i, dtype in enumerate(dtypes):
s = df.iloc[:, i]
if not pd.api.types.is_dtype_equal(s.dtype, dtype):
df[df.columns[i]] = s.astype(dtype)
dfs.append(df)
if len(dfs) == 1:
ret_df = dfs[0]
else:
ret_df = pd.concat(dfs)
if ensure_string:
obj_dtypes = dtypes[dtypes == np.dtype("O")]
ret_df[obj_dtypes.index] = ret_df[obj_dtypes.index].radd("O")
return ret_df
def build_empty_series(dtype, index=None, name=None):
length = len(index) if index is not None else 0
return pd.Series(
[_generate_value(dtype, 1) for _ in range(length)],
dtype=dtype,
index=index,
name=name,
)
def build_series(
series_obj=None,
fill_value=1,
size=1,
name=None,
ensure_string=False,
dtype=None,
index=None,
):
seriess = []
if not isinstance(size, (list, tuple)):
sizes = [size]
else:
sizes = size
if not isinstance(fill_value, (list, tuple)):
fill_values = [fill_value]
else:
fill_values = fill_value
if series_obj is not None:
dtype = series_obj.dtype
try:
series_index = series_obj.index_value.to_pandas()[:0]
except AttributeError:
series_index = series_obj.index[:0]
else:
series_index = index[:0] if index is not None else None
for size, fill_value in zip(sizes, fill_values):
empty_series = build_empty_series(dtype, name=name, index=series_index)
record = _generate_value(dtype, fill_value)
if isinstance(empty_series.index, pd.MultiIndex):
index = tuple(
_generate_value(level.dtype, fill_value)
for level in empty_series.index.levels
)
empty_series = empty_series.reindex(
index=pd.MultiIndex.from_tuples([index], names=empty_series.index.names)
)
empty_series.iloc[0] = record
else:
if isinstance(empty_series.index.dtype, pd.CategoricalDtype):
index = None
else:
index = _generate_value(empty_series.index.dtype, fill_value)
empty_series.loc[index] = record
empty_series = pd.concat([empty_series] * size)
# make sure dtype correct for MultiIndex
empty_series = empty_series.astype(dtype, copy=False)
seriess.append(empty_series)
if len(seriess) == 1:
ret_series = seriess[0]
else:
ret_series = pd.concat(seriess)
if ensure_string and dtype == np.dtype("O"):
ret_series = ret_series.radd("O")
return ret_series
def infer_index_value(left_index_value, right_index_value, level=None):
from .core import IndexValue
if isinstance(left_index_value.value, IndexValue.RangeIndex) and isinstance(
right_index_value.value, IndexValue.RangeIndex
):
if left_index_value.value.slice == right_index_value.value.slice:
return left_index_value
return parse_index(
pd.Index([], dtype=np.int64), left_index_value, right_index_value
)
# when left index and right index is identical, and both of them are elements unique,
# we can infer that the out index should be identical also
if (
left_index_value.is_unique
and right_index_value.is_unique
and left_index_value.key == right_index_value.key
):
return left_index_value
left_index = left_index_value.to_pandas()
right_index = right_index_value.to_pandas()
out_index = left_index.join(right_index, level=level)[:0]
return parse_index(out_index, left_index_value, right_index_value)
def indexing_index_value(index_value, indexes, store_data=False, rechunk=False):
pd_index = index_value.to_pandas()
# when rechunk is True, the output index shall be treated
# different from the input one
if not rechunk and isinstance(indexes, slice) and is_full_slice(indexes):
return index_value
elif not index_value.has_value():
new_index_value = parse_index(pd_index, indexes, store_data=store_data)
new_index_value._index_value._min_val = index_value.min_val
new_index_value._index_value._min_val_close = index_value.min_val_close
new_index_value._index_value._max_val = index_value.max_val
new_index_value._index_value._max_val_close = index_value.max_val_close
return new_index_value
else:
if isinstance(indexes, Integral):
return parse_index(pd_index[[indexes]], store_data=store_data)
elif isinstance(indexes, Entity):
if isinstance(pd_index, pd.RangeIndex):
return parse_index(
pd.RangeIndex(-1), indexes, index_value, store_data=False
)
else:
return parse_index(
type(pd_index)([]), indexes, index_value, store_data=False
)
if isinstance(indexes, tuple):
return parse_index(pd_index[list(indexes)], store_data=store_data)
else:
return parse_index(pd_index[indexes], store_data=store_data)
def merge_index_value(to_merge_index_values: dict, store_data: bool = False):
"""
Merge index value according to their chunk index.
Parameters
----------
to_merge_index_values : dict
index to index_value
store_data : bool
store data in index_value
Returns
-------
merged_index_value
"""
pd_index = None
min_val, min_val_close, max_val, max_val_close = None, None, None, None
for _, chunk_index_value in sorted(to_merge_index_values.items()):
if pd_index is None:
pd_index = chunk_index_value.to_pandas()
min_val, min_val_close, max_val, max_val_close = (
chunk_index_value.min_val,
chunk_index_value.min_val_close,
chunk_index_value.max_val,
chunk_index_value.max_val_close,
)
else:
cur_pd_index = chunk_index_value.to_pandas()
if store_data or (
isinstance(pd_index, pd.RangeIndex)
and isinstance(cur_pd_index, pd.RangeIndex)
and cur_pd_index.step == pd_index.step
and cur_pd_index.start == pd_index.stop
):
# range index that is continuous
pd_index = pd_index.append(cur_pd_index)
else:
pd_index = pd.Index([], dtype=pd_index.dtype)
if chunk_index_value.min_val is not None:
try:
if min_val is None or min_val > chunk_index_value.min_val:
min_val = chunk_index_value.min_val
min_val_close = chunk_index_value.min_val_close
except TypeError:
# min_value has different types that cannot compare
# just stop compare
continue
if chunk_index_value.max_val is not None:
if max_val is None or max_val < chunk_index_value.max_val:
max_val = chunk_index_value.max_val
max_val_close = chunk_index_value.max_val_close
index_value = parse_index(pd_index, store_data=store_data)
if not index_value.has_value():
index_value._index_value._min_val = min_val
index_value._index_value._min_val_close = min_val_close
index_value._index_value._max_val = max_val
index_value._index_value._max_val_close = max_val_close
return index_value
def infer_dtypes(left_dtypes, right_dtypes, operator):
left = build_empty_df(left_dtypes)
right = build_empty_df(right_dtypes)
return operator(left, right).dtypes
@functools.lru_cache(100)
def infer_dtype(left_dtype, right_dtype, operator):
left = build_empty_series(left_dtype)
right = build_empty_series(right_dtype)
return operator(left, right).dtype
def filter_dtypes(dtypes, column_min_max):
left_filter = operator.ge if column_min_max[1] else operator.gt
left = left_filter(dtypes.index, column_min_max[0])
right_filter = operator.le if column_min_max[3] else operator.lt
right = right_filter(dtypes.index, column_min_max[2])
return dtypes[left & right]
def in_range_index(i, pd_range_index):
"""
Check whether the input `i` is within `pd_range_index` which is a pd.RangeIndex.
"""
start, stop, step = (
_get_range_index_start(pd_range_index),
_get_range_index_stop(pd_range_index),
_get_range_index_step(pd_range_index),
)
if step > 0 and start <= i < stop and (i - start) % step == 0:
return True
if step < 0 and start >= i > stop and (start - i) % step == 0:
return True
return False
def wrap_notimplemented_exception(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except NotImplementedError:
return NotImplemented
return wrapper
def validate_axis(axis, tileable=None):
if axis == "index":
axis = 0
elif axis == "columns":
axis = 1
illegal = False
try:
axis = operator.index(axis)
if axis < 0 or (tileable is not None and axis >= tileable.ndim):
illegal = True
except TypeError:
illegal = True
if illegal:
raise ValueError(f"No axis named {axis} for object type {type(tileable)}")
return axis
def validate_axis_style_args(
data, args, kwargs, arg_name, method_name
): # pragma: no cover
"""Argument handler for mixed index, columns / axis functions
In an attempt to handle both `.method(index, columns)`, and
`.method(arg, axis=.)`, we have to do some bad things to argument
parsing. This translates all arguments to `{index=., columns=.}` style.
Parameters
----------
data : DataFrame
args : tuple
All positional arguments from the user
kwargs : dict
All keyword arguments from the user
arg_name, method_name : str
Used for better error messages
Returns
-------
kwargs : dict
A dictionary of keyword arguments. Doesn't modify ``kwargs``
inplace, so update them with the return value here.
"""
out = {}
# Goal: fill 'out' with index/columns-style arguments
# like out = {'index': foo, 'columns': bar}
# Start by validating for consistency
axes_names = ["index"] if data.ndim == 1 else ["index", "columns"]
if "axis" in kwargs and any(x in kwargs for x in axes_names):
msg = "Cannot specify both 'axis' and any of 'index' or 'columns'."
raise TypeError(msg)
# First fill with explicit values provided by the user...
if arg_name in kwargs:
if args:
msg = f"{method_name} got multiple values for argument '{arg_name}'"
raise TypeError(msg)
axis = axes_names[validate_axis(kwargs.get("axis", 0), data)]
out[axis] = kwargs[arg_name]
# More user-provided arguments, now from kwargs
for k, v in kwargs.items():
try:
ax = axes_names[validate_axis(k, data)]
except ValueError:
pass
else:
out[ax] = v
# All user-provided kwargs have been handled now.
# Now we supplement with positional arguments, emitting warnings
# when there's ambiguity and raising when there's conflicts
if len(args) == 0:
pass # It's up to the function to decide if this is valid
elif len(args) == 1:
axis = axes_names[validate_axis(kwargs.get("axis", 0), data)]
out[axis] = args[0]
elif len(args) == 2:
if "axis" in kwargs:
# Unambiguously wrong
msg = "Cannot specify both 'axis' and any of 'index' or 'columns'"
raise TypeError(msg)
msg = (
"Interpreting call\n\t'.{method_name}(a, b)' as "
"\n\t'.{method_name}(index=a, columns=b)'.\nUse named "
"arguments to remove any ambiguity."
)
raise TypeError(msg.format(method_name=method_name))
else:
msg = f"Cannot specify all of '{arg_name}', 'index', 'columns'."
raise TypeError(msg)
return out
def validate_output_types(**kwargs):
from ..core import OutputType
output_type = kwargs.pop("object_type", None) or kwargs.pop("output_type", None)
output_types = kwargs.pop("output_types", None) or (
[output_type] if output_type is not None else None
)
return (
[
getattr(OutputType, v.lower()) if isinstance(v, str) else v
for v in output_types
]
if output_types
else None
)
def fetch_corner_data(df_or_series, session=None) -> pd.DataFrame:
"""
Fetch corner DataFrame or Series for repr usage.
:param df_or_series: DataFrame or Series
:return: corner DataFrame
"""
from .indexing.iloc import iloc
max_rows = pd.get_option("display.max_rows")
try:
min_rows = pd.get_option("display.min_rows")
min_rows = min(min_rows, max_rows)
except KeyError: # pragma: no cover
# display.min_rows is introduced in pandas 0.25
min_rows = max_rows
index_size = None
if (
df_or_series.shape[0] > max_rows
and df_or_series.shape[0] > min_rows // 2 * 2 + 2
):
# for pandas, greater than max_rows
# will display min_rows
# thus we fetch min_rows + 2 lines
index_size = min_rows // 2 + 1
if index_size is None:
return df_or_series._fetch(session=session)
else:
head = iloc(df_or_series)[:index_size]
tail = iloc(df_or_series)[-index_size:]
head_data, tail_data = ExecutableTuple([head, tail]).fetch(session=session)
xdf = cudf if head.op.is_gpu() else pd
return xdf.concat([head_data, tail_data], axis="index")
class ReprSeries(pd.Series):
def __init__(self, corner_data, real_shape):
super().__init__(corner_data)
self._real_shape = real_shape
def __len__(self):
# As we only fetch corner data to repr,
# the length would be wrong and we have no way to control,
# thus we just overwrite the length to show the real one
return self._real_shape[0]
def filter_dtypes_by_index(dtypes, index):
try:
new_dtypes = dtypes.loc[index].dropna()
except KeyError:
dtypes_idx = (
dtypes.index.to_frame()
.merge(index.to_frame())
.set_index(list(range(dtypes.index.nlevels)))
.index
)
new_dtypes = dtypes.loc[dtypes_idx]
new_dtypes.index.names = dtypes.index.names
return new_dtypes
@contextmanager
def create_sa_connection(con, **kwargs):
import sqlalchemy as sa
from sqlalchemy.engine import Connection, Engine
# process con
engine = None
if isinstance(con, Connection):
# connection create by user
close = False
dispose = False
elif isinstance(con, Engine):
con = con.connect()
close = True
dispose = False
else:
engine = sa.create_engine(con, **kwargs)
con = engine.connect()
close = True
dispose = True
try:
yield con
finally:
if close:
con.close()
if dispose:
engine.dispose()
def to_arrow_dtypes(dtypes, test_df=None):
from .arrays import ArrowStringDtype
new_dtypes = dtypes.copy()
for i in range(len(dtypes)):
dtype = dtypes.iloc[i]
if is_string_dtype(dtype):
if test_df is not None:
series = test_df.iloc[:, i]
# check value
non_na_series = series[series.notna()]
if len(non_na_series) > 0:
first_value = non_na_series.iloc[0]
if isinstance(first_value, str):
new_dtypes.iloc[i] = ArrowStringDtype()
else: # pragma: no cover
# empty, set arrow string dtype
new_dtypes.iloc[i] = ArrowStringDtype()
else:
# empty, set arrow string dtype
new_dtypes.iloc[i] = ArrowStringDtype()
return new_dtypes
def make_dtype(dtype):
if isinstance(dtype, (np.dtype, ExtensionDtype)):
return dtype
return np.dtype(dtype) if dtype is not None else None
def make_dtypes(dtypes):
if dtypes is None:
return None
if not isinstance(dtypes, pd.Series):
dtypes = pd.Series(dtypes)
return dtypes.apply(make_dtype)
def is_dataframe(x):
if cudf is not None: # pragma: no cover
if isinstance(x, cudf.DataFrame):
return True
return isinstance(x, pd.DataFrame)
def is_series(x):
if cudf is not None: # pragma: no cover
if isinstance(x, cudf.Series):
return True
return isinstance(x, pd.Series)
def is_index(x):
if cudf is not None: # pragma: no cover
if isinstance(x, cudf.Index):
return True
return isinstance(x, pd.Index)
def get_xdf(x):
if cudf is not None: # pragma: no cover
if isinstance(x, (cudf.DataFrame, cudf.Series, cudf.Index)):
return cudf
return pd
def is_cudf(x):
if cudf is not None: # pragma: no cover
if isinstance(x, (cudf.DataFrame, cudf.Series, cudf.Index)):
return True
return False
def whether_to_clean_up(op, threshold):
func = op.func
counted_bytes = 0
max_recursion_depth = 2
from collections import deque
from numbers import Number
BYPASS_CLASSES = (str, bytes, Number, range, bytearray, pd.DataFrame, pd.Series)
class GetSizeEarlyStopException(Exception):
pass
def check_exceed_threshold():
nonlocal threshold, counted_bytes
if counted_bytes >= threshold:
raise GetSizeEarlyStopException()
def getsize(obj_outer):
_seen_obj_ids = set()
def inner_count(obj, recursion_depth):
obj_id = id(obj)
if obj_id in _seen_obj_ids or recursion_depth > max_recursion_depth:
return 0
_seen_obj_ids.add(obj_id)
recursion_depth += 1
size = sys.getsizeof(obj)
if isinstance(obj, BYPASS_CLASSES):
return size
elif isinstance(obj, (tuple, list, set, deque)):
size += sum(inner_count(i, recursion_depth) for i in obj)
elif hasattr(obj, "items"):
size += sum(
inner_count(k, recursion_depth) + inner_count(v, recursion_depth)
for k, v in getattr(obj, "items")()
)
if hasattr(obj, "__dict__"):
size += inner_count(vars(obj), recursion_depth)
if hasattr(obj, "__slots__"):
size += sum(
inner_count(getattr(obj, s), recursion_depth)
for s in obj.__slots__
if hasattr(obj, s)
)
return size
return inner_count(obj_outer, 0)
try:
# Note: In most cases, func is just a function with closure, while chances are that
# func is a callable that doesn't have __closure__ attribute.
if inspect.isclass(func):
pass
elif hasattr(func, "__closure__") and func.__closure__ is not None:
for cell in func.__closure__:
counted_bytes += getsize(cell.cell_contents)
check_exceed_threshold()
elif callable(func):
if hasattr(func, "__dict__"):
for k, v in func.__dict__.items():
counted_bytes += sum([getsize(k), getsize(v)])
check_exceed_threshold()
if hasattr(func, "__slots__"):
for slot in func.__slots__:
counted_bytes += (
getsize(getattr(func, slot)) if hasattr(func, slot) else 0
)
check_exceed_threshold()
except GetSizeEarlyStopException:
logger.debug("Func needs cleanup.")
op.need_clean_up_func = True
else:
assert op.need_clean_up_func is False
logger.debug("Func doesn't need cleanup.")
return op.need_clean_up_func
def concat_on_columns(objs: List) -> Any:
xdf = get_xdf(objs[0])
# In cudf, concat with axis=1 and ignore_index=False by default behaves opposite to pandas.
# Cudf would reset the index when axis=1 and ignore_index=False, which does not match with its document.
# Therefore, we deal with this case specially.
result = xdf.concat(objs, axis=1)
if xdf is cudf:
result.index = objs[0].index
return result
def apply_if_callable(maybe_callable, obj, **kwargs):
if callable(maybe_callable):
return maybe_callable(obj, **kwargs)
return maybe_callable
def patch_sa_engine_execute():
"""
pandas did not resolve compatibility issue of sqlalchemy 2.0, the issue
is https://github.com/pandas-dev/pandas/issues/40686. We need to patch
Engine class in SQLAlchemy, and then our code can work well.
"""
try:
from sqlalchemy.engine import Engine
except ImportError: # pragma: no cover
return
def execute(self, statement, *multiparams, **params):
connection = self.connect()
return connection.execute(statement, *multiparams, **params)
if hasattr(Engine, "execute"): # pragma: no cover
return
Engine.execute = execute
def bind_func_args_from_pos(func, args_bind_position, *bound_args, **bound_kwargs):
"""
Create a new function with arguments bound from specified position.
Parameters
----------
func : callable
Target function to be wrapped.
args_bind_position : int
Position to start binding arguments (0-based).
e.g., n=0 binds from first arg, n=1 binds from second arg.
*bound_args : tuple
Arguments to be bound from position n.
**bound_kwargs : dict
Keyword arguments to be bound.
Returns
-------
callable
Wrapped function with bound arguments.
Examples
--------
>>> def func(x, y, z=0):
... return x * y + z
>>> f = bind_func_args_from_pos(func, 0, 10) # bind from second position
>>> f(5) # equals func(5, 10)
10
Raises
------
TypeError
If func is not callable or n is not an integer.
ValueError
If n is negative or exceeds the number of parameters.
"""
@functools.wraps(func)
def wrapper(*runtime_args, **runtime_kwargs):
try:
# Combine arguments
all_args = (
runtime_args[:args_bind_position]
+ bound_args
+ runtime_args[args_bind_position:]
)
all_kwargs = {**bound_kwargs, **runtime_kwargs}
return func(*all_args, **all_kwargs)
except Exception as e:
# Enhance error message with context
raise type(e)(
f"Error calling {func.__name__} with bound arguments: {str(e)}"
) from e
return wrapper
def pack_func_args(df, funcs, *args, args_bind_position=1, **kwargs) -> Any:
"""
Pack the funcs with args and kwargs to avoid the ambiguity between other
positional and keyword arguments. It will process the funcs by the following rule:
1. If there's no such args and kwargs, return funcs itself.
2. If the funcs is a dict-like object, it will iterate each key-value pair, pack the
value recursively, and return a new dict with the same keys and packed values.
3. If the funcs is a list-like object, it will iterate each element, pack it
recursively, and return a new list with the packed elements.
4. If the funcs is a str object, it will try to get the attribute df.funcs firstly,
if it exists and is a callable, return a partial one with args and kwargs packed in.
If it exists but isn't a callable, a ValueError is raised. If it doesn't exist, then
try to get the attribute of np.funcs, if it exists and df is acceptable by funcs,
return a partial one with args and kwargs packed in, otherwise an AttributeValue is
raised. This rule is almost the same with pandas.
5. Other cases are treated as funcs being a callable, returns the partial one with
args and kwargs packed in.
Parameters
----------
df : pandas.DataFrame or pandas.Series
The DataFrame or Series object to test the function.
funcs : function, str, list-like or dict-like
Function to pack. It should have the same type with Dataframe.transform().
args_bind_position: int
Position to start binding arguments (0-based).
e.g., n=0 binds from first arg, n=1 binds from second arg.
*args :
The positional arguments to func. If funcs contains many functions, each one
should be able to accept *args.
**kwargs :
The keyword arguments to func. If funcs contains many functions, each one
should be able to accept **kwargs.
Returns
-------
The packed functions having the same structure with funcs.
Raises
------
ValueError :
If there's a string but the corresponding function doesn't accept any positional
or keyword arguments.
AttributeError :
If there's a string but no corresponding function is found.
"""
if not args and not kwargs:
return funcs
if is_dict_like(funcs):
return {k: pack_func_args(df, v, *args, **kwargs) for k, v in funcs.items()}
if is_list_like(funcs):
return [pack_func_args(df, v, *args, **kwargs) for v in funcs]
f = get_callable_by_name(df, funcs) if isinstance(funcs, str) else funcs
from ..udf import MarkedFunction
if isinstance(f, MarkedFunction):
# for marked function, pack the inner function, and reset as mark function
packed_func = f.copy()
packed_func.func = bind_func_args_from_pos(
f.func, args_bind_position, *args, **kwargs
)
else:
packed_func = bind_func_args_from_pos(f, args_bind_position, *args, **kwargs)
# Callable
return packed_func
def get_callable_by_name(df: Any, func_name: str) -> Callable:
"""
Get the callable by the func name.
It will try to get the attribute df.funcs firstly, if it exists and is a callable,
return it. If it exists but isn't a callable, a ValueError is raised. If it doesn't
exist, then try to get the attribute of np.funcs, if it exists and df is acceptable
by funcs, return a partial one with args and kwargs packed in, otherwise an
AttributeValue is raised. This rule is almost the same with pandas.
Parameters
----------
df: padnas.Series or pandas.Dataframe
The receiver of the func name.
func_name : str
The func name.
Returns
-------
The callable instance.
Raises
------
ValueError :
If it's not a valid callable.
AttributeError :
If there's no corresponding function is found.
"""
if hasattr(df, func_name):
f = getattr(df, func_name)
if callable(f):
return f
raise ValueError(f"{func_name} is not a callable")
if hasattr(np, func_name) and hasattr(df, "__array__"):
return getattr(np, func_name)
raise AttributeError(
f"'{func_name}' is not a valid function for '{type(df).__name__}' object"
)
def copy_func_scheduling_hints(func, op: "DataFrameOperator") -> None:
if not isinstance(func, MarkedFunction):
return
if func.expect_engine:
op.expect_engine = func.expect_engine
if func.expect_resources:
op.expect_resources = func.expect_resources