core/maxframe/dataframe/statistics/quantile.py (202 lines of code) (raw):

# Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import numpy as np import pandas as pd from pandas.core.dtypes.cast import find_common_type from ... import opcodes from ...core import ENTITY_TYPE from ...serialization.serializables import ( AnyField, BoolField, DataTypeField, Int32Field, KeyField, StringField, ) from ...tensor.core import TENSOR_TYPE from ...tensor.datasource import empty from ...tensor.datasource import from_series as tensor_from_series from ...tensor.datasource import tensor as astensor from ...tensor.statistics.quantile import quantile as tensor_quantile from ..core import DATAFRAME_TYPE from ..operators import DataFrameOperator, DataFrameOperatorMixin from ..utils import build_empty_df, parse_index, validate_axis class DataFrameQuantile(DataFrameOperator, DataFrameOperatorMixin): _op_type_ = opcodes.QUANTILE input = KeyField("input", default=None) q = AnyField("q", default=None) axis = Int32Field("axis", default=None) numeric_only = BoolField("numeric_only", default=None) interpolation = StringField("interpolation", default=None) dtype = DataTypeField("dtype", default=None) def __init__(self, output_types=None, **kw): super().__init__(_output_types=output_types, **kw) def _set_inputs(self, inputs): super()._set_inputs(inputs) self.input = self._inputs[0] if isinstance(self.q, TENSOR_TYPE): self.q = self._inputs[-1] def _calc_dtype_on_axis_1(self, a, dtypes): quantile_dtypes = [] for name in dtypes.index: dt = tensor_quantile( tensor_from_series(a[name]), self.q, interpolation=self.interpolation, handle_non_numeric=not self.numeric_only, ).dtype quantile_dtypes.append(dt) return find_common_type(quantile_dtypes) def _call_dataframe(self, a, inputs): if self.numeric_only: empty_df = build_empty_df(a.dtypes) dtypes = empty_df._get_numeric_data().dtypes else: dtypes = a.dtypes if isinstance(self.q, TENSOR_TYPE): q_val = self.q pd_index = pd.Index([], dtype=q_val.dtype) name = None store_index_value = False else: q_val = np.asanyarray(self.q) if q_val.ndim == 0: pd_index = pd.Index(q_val.reshape(1)) else: pd_index = pd.Index(q_val) name = self.q if q_val.size == 1 else None store_index_value = True tokenize_objects = (a, q_val, self.interpolation, type(self).__name__) if q_val.ndim == 0 and self.axis == 0: index_value = parse_index(dtypes.index, store_data=store_index_value) shape = (len(dtypes),) # calc dtype dtype = self._calc_dtype_on_axis_1(a, dtypes) return self.new_series( inputs, shape=shape, dtype=dtype, index_value=index_value, name=name or dtypes.index.name, ) elif q_val.ndim == 0 and self.axis == 1: index_value = a.index_value shape = (len(a),) # calc dtype dt = tensor_quantile( empty(a.shape[1], dtype=find_common_type(list(dtypes))), self.q, interpolation=self.interpolation, handle_non_numeric=not self.numeric_only, ).dtype return self.new_series( inputs, shape=shape, dtype=dt, index_value=index_value, name=name or index_value.name, ) elif q_val.ndim == 1 and self.axis == 0: shape = (len(q_val), len(dtypes)) index_value = parse_index( pd_index, *tokenize_objects, store_data=store_index_value ) dtype_list = [] for name in dtypes.index: dtype_list.append( tensor_quantile( tensor_from_series(a[name]), self.q, interpolation=self.interpolation, handle_non_numeric=not self.numeric_only, ).dtype ) dtypes = pd.Series(dtype_list, index=dtypes.index) return self.new_dataframe( inputs, shape=shape, dtypes=dtypes, index_value=index_value, columns_value=parse_index(dtypes.index, store_data=True), ) else: assert q_val.ndim == 1 and self.axis == 1 shape = (len(q_val), a.shape[0]) index_value = parse_index( pd_index, *tokenize_objects, store_data=store_index_value ) pd_columns = a.index_value.to_pandas() dtype_list = np.full(len(pd_columns), self._calc_dtype_on_axis_1(a, dtypes)) dtypes = pd.Series(dtype_list, index=pd_columns) return self.new_dataframe( inputs, shape=shape, dtypes=dtypes, index_value=index_value, columns_value=parse_index( dtypes.index, store_data=True, key=a.index_value.key ), ) def _call_series(self, a, inputs): if isinstance(self.q, TENSOR_TYPE): q_val = self.q index_val = pd.Index([], dtype=q_val.dtype) store_index_value = False else: q_val = np.asanyarray(self.q) if q_val.ndim == 0: index_val = pd.Index(q_val.reshape(1)) else: index_val = pd.Index(q_val) store_index_value = True # get dtype by tensor a_t = astensor(a) self._dtype = dtype = tensor_quantile( a_t, self.q, interpolation=self.interpolation, handle_non_numeric=not self.numeric_only, ).dtype if q_val.ndim == 0: return self.new_scalar(inputs, dtype=dtype) else: return self.new_series( inputs, shape=q_val.shape, dtype=dtype, index_value=parse_index( index_val, a, q_val, self.interpolation, type(self).__name__, store_data=store_index_value, ), name=a.name, ) def __call__(self, a, q_input=None): inputs = [a] if q_input is not None: inputs.append(q_input) if isinstance(a, DATAFRAME_TYPE): return self._call_dataframe(a, inputs) else: return self._call_series(a, inputs) def quantile_series(series, q=0.5, interpolation="linear"): """ Return value at the given quantile. Parameters ---------- q : float or array-like, default 0.5 (50% quantile) 0 <= q <= 1, the quantile(s) to compute. interpolation : {'linear', 'lower', 'higher', 'midpoint', 'nearest'} This optional parameter specifies the interpolation method to use, when the desired quantile lies between two data points `i` and `j`: * linear: `i + (j - i) * fraction`, where `fraction` is the fractional part of the index surrounded by `i` and `j`. * lower: `i`. * higher: `j`. * nearest: `i` or `j` whichever is nearest. * midpoint: (`i` + `j`) / 2. Returns ------- float or Series If ``q`` is an array or a tensor, a Series will be returned where the index is ``q`` and the values are the quantiles, otherwise a float will be returned. See Also -------- core.window.Rolling.quantile numpy.percentile Examples -------- >>> import maxframe.dataframe as md >>> s = md.Series([1, 2, 3, 4]) >>> s.quantile(.5).execute() 2.5 >>> s.quantile([.25, .5, .75]).execute() 0.25 1.75 0.50 2.50 0.75 3.25 dtype: float64 """ if isinstance(q, ENTITY_TYPE): q = astensor(q) q_input = q else: q_input = None op = DataFrameQuantile(q=q, interpolation=interpolation, gpu=series.op.gpu) return op(series, q_input=q_input) def quantile_dataframe(df, q=0.5, axis=0, numeric_only=True, interpolation="linear"): # FIXME: Timedelta not support. Data invalid: ODPS-0010000:InvalidArgument:duration[ns] is not equal to string """ Return values at the given quantile over requested axis. Parameters ---------- q : float or array-like, default 0.5 (50% quantile) Value between 0 <= q <= 1, the quantile(s) to compute. axis : {0, 1, 'index', 'columns'} (default 0) Equals 0 or 'index' for row-wise, 1 or 'columns' for column-wise. numeric_only : bool, default True If False, the quantile of datetime and timedelta data will be computed as well. interpolation : {'linear', 'lower', 'higher', 'midpoint', 'nearest'} This optional parameter specifies the interpolation method to use, when the desired quantile lies between two data points `i` and `j`: * linear: `i + (j - i) * fraction`, where `fraction` is the fractional part of the index surrounded by `i` and `j`. * lower: `i`. * higher: `j`. * nearest: `i` or `j` whichever is nearest. * midpoint: (`i` + `j`) / 2. Returns ------- Series or DataFrame If ``q`` is an array or a tensor, a DataFrame will be returned where the index is ``q``, the columns are the columns of self, and the values are the quantiles. If ``q`` is a float, a Series will be returned where the index is the columns of self and the values are the quantiles. See Also -------- core.window.Rolling.quantile: Rolling quantile. numpy.percentile: Numpy function to compute the percentile. Examples -------- >>> import maxframe.dataframe as md >>> df = md.DataFrame(np.array([[1, 1], [2, 10], [3, 100], [4, 100]]), ... columns=['a', 'b']) >>> df.quantile(.1).execute() a 1.3 b 3.7 Name: 0.1, dtype: float64 >>> df.quantile([.1, .5]).execute() a b 0.1 1.3 3.7 0.5 2.5 55.0 """ if isinstance(q, ENTITY_TYPE): q = astensor(q) q_input = q else: q_input = None axis = validate_axis(axis, df) op = DataFrameQuantile( q=q, interpolation=interpolation, axis=axis, numeric_only=numeric_only, gpu=df.op.gpu, ) return op(df, q_input=q_input)