odps/df/expr/window.py (312 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .groupby import *
from .reduction import _stats_type
from . import utils, errors
from .. import types
from ...utils import camel_to_underline, object_getattr
class Window(SequenceExpr):
_args = '_input', '_partition_by', '_order_by', '_preceding', '_following'
def _init(self, *args, **kwargs):
self._preceding = None
self._following = None
super(Window, self)._init(*args, **kwargs)
self._preceding, self._following = \
self._de_scalar(self._preceding), self._de_scalar(self._following)
if isinstance(self._preceding, list):
self._preceding = tuple(self._preceding)
elif isinstance(self._following, list):
self._following = tuple(self._following)
if (isinstance(self._preceding, tuple) and self._following is not None) or \
(isinstance(self._following, tuple) and self._preceding is not None):
raise ValueError('Cannot specify window of both sides')
if isinstance(self._preceding, tuple):
start, end = self._preceding
if start is None:
assert end >= 0
else:
assert start > end
elif isinstance(self._following, tuple):
start, end = self._following
if end is None:
assert start >= 0
else:
assert start < end
else:
if self._preceding is not None and self._preceding < 0:
raise ValueError('Window offset must be positive')
if self._following is not None and self._following < 0:
raise ValueError('Window offset must be positive')
self._preceding, self._following = \
self._scalar(self._preceding), self._scalar(self._following)
def _scalar(self, val):
if val is None:
return
if isinstance(val, Scalar):
return val
if isinstance(val, tuple):
return tuple(self._scalar(it) for it in val)
else:
return Scalar(_value=val)
def _de_scalar(self, val):
if val is None:
return
if isinstance(val, tuple):
return tuple(self._de_scalar(it) for it in val)
elif isinstance(val, Scalar):
return val._value
else:
return val
def iter_args(self):
for it in zip(['Column', 'PartitionBy', 'OrderBy',
'preceding', 'following'], self.args):
yield it
@property
def name(self):
if self._name:
return self._name
source_name = self.source_name
if source_name:
name = self.node_name
if name.startswith('Cum'):
name = name[3:]
return '%s_%s' % (source_name, name.lower())
@property
def source_name(self):
input_name = None
if isinstance(self._input, SequenceExpr):
input_name = self._input.name
return self._source_name or input_name
@property
def node_name(self):
return self.__class__.__name__
@property
def input(self):
return self._input
@property
def preceding(self):
return self._de_scalar(self._preceding)
@property
def following(self):
return self._de_scalar(self._following)
class CumulativeOp(Window):
_args = '_input', '_partition_by', '_distinct', '_order_by', \
'_preceding', '_following'
def _init(self, *args, **kwargs):
self._distinct = False
super(CumulativeOp, self)._init(*args, **kwargs)
self._distinct = self._de_scalar(self._distinct)
self._check_unique()
self._distinct = self._scalar(self._distinct)
def _check_unique(self):
if self._distinct and len(getattr(self, '_order_by', None) or []) > 0:
raise errors.ExpressionError('Unique and sort cannot exist both')
def unique(self):
if self._distinct.value:
return self
self._check_unique()
attr_values = dict((attr, getattr(self, attr, None))
for attr in utils.get_attrs(self))
attr_values['_distinct'] = True
return type(self)(**attr_values)
def iter_args(self):
for it in zip(['Column', 'PartitionBy', 'distinct', 'OrderBy',
'preceding', 'following'], self.args):
yield it
@property
def distinct(self):
return self._distinct.value
def accept(self, visitor):
return visitor.visit_cum_window(self)
class CumSum(CumulativeOp):
__slots__ = ()
class CumMax(CumulativeOp):
__slots__ = ()
class CumMin(CumulativeOp):
__slots__ = ()
class CumMean(CumulativeOp):
__slots__ = ()
class CumMedian(CumulativeOp):
__slots__ = ()
class CumCount(CumulativeOp):
__slots__ = ()
class CumStd(CumulativeOp):
__slots__ = ()
class NthValue(CumulativeOp):
__slots__ = '_nth', '_skip_nulls'
class RankOp(Window):
__slots__ = ()
@property
def name(self):
if self._name:
return self._name
return camel_to_underline(self.__class__.__name__)
def accept(self, visitor):
return visitor.visit_rank_window(self)
class Rank(RankOp):
__slots__ = ()
class DenseRank(RankOp):
__slots__ = ()
class PercentRank(RankOp):
__slots__ = ()
class RowNumber(RankOp):
__slots__ = ()
class QCut(RankOp):
__slots__ = '_bins',
class CumeDist(RankOp):
__slots__ = ()
class ShiftOp(Window):
_args = '_input', '_partition_by', '_order_by', '_offset', '_default'
def _init(self, *args, **kwargs):
self._init_attr('_offset', None)
self._init_attr('_default', None)
super(ShiftOp, self)._init(*args, **kwargs)
if self._offset is not None and not isinstance(self._offset, Expr):
self._offset = self._scalar(self._offset)
if self._default is not None and not isinstance(self._default, Expr):
self._default = self._scalar(self._default)
def accept(self, visitor):
return visitor.visit_shift_window(self)
class Lag(ShiftOp):
__slots__ = ()
class Lead(ShiftOp):
__slots__ = ()
def _cumulative_op(expr, op_cls, sort=None, ascending=True, unique=False,
preceding=None, following=None, data_type=None, **kwargs):
if isinstance(expr, SequenceGroupBy):
if sort is not None:
groupby = expr._input.sort(sort, ascending=ascending)
expr = groupby[expr._name]
collection = expr._input._input
column = collection[expr._name]
data_type = data_type or expr._data_type
return op_cls(_input=column, _partition_by=expr._input._by,
_order_by=object_getattr(expr._input, '_sorted_fields', None),
_preceding=preceding, _following=following,
_data_type=data_type, _distinct=unique, **kwargs)
def cumsum(expr, sort=None, ascending=True, unique=False,
preceding=None, following=None):
"""
Calculate cumulative summation of a sequence expression.
:param expr: expression for calculation
:param sort: name of the sort column
:param ascending: whether to sort in ascending order
:param unique: whether to eliminate duplicate entries
:param preceding: the start point of a window
:param following: the end point of a window
:return: calculated column
"""
if expr._data_type == types.boolean:
output_type = types.int64
else:
output_type = expr._data_type
return _cumulative_op(expr, CumSum, sort=sort, ascending=ascending,
unique=unique, preceding=preceding,
following=following, data_type=output_type)
def cummax(expr, sort=None, ascending=True, unique=False,
preceding=None, following=None):
"""
Calculate cumulative maximum of a sequence expression.
:param expr: expression for calculation
:param sort: name of the sort column
:param ascending: whether to sort in ascending order
:param unique: whether to eliminate duplicate entries
:param preceding: the start point of a window
:param following: the end point of a window
:return: calculated column
"""
return _cumulative_op(expr, CumMax, sort=sort, ascending=ascending,
unique=unique, preceding=preceding,
following=following)
def cummin(expr, sort=None, ascending=True, unique=False,
preceding=None, following=None):
"""
Calculate cumulative minimum of a sequence expression.
:param expr: expression for calculation
:param sort: name of the sort column
:param ascending: whether to sort in ascending order
:param unique: whether to eliminate duplicate entries
:param preceding: the start point of a window
:param following: the end point of a window
:return: calculated column
"""
return _cumulative_op(expr, CumMin, sort=sort, ascending=ascending,
unique=unique, preceding=preceding,
following=following)
def cummean(expr, sort=None, ascending=True, unique=False,
preceding=None, following=None):
"""
Calculate cumulative mean of a sequence expression.
:param expr: expression for calculation
:param sort: name of the sort column
:param ascending: whether to sort in ascending order
:param unique: whether to eliminate duplicate entries
:param preceding: the start point of a window
:param following: the end point of a window
:return: calculated column
"""
data_type = _stats_type(expr)
return _cumulative_op(expr, CumMean, sort=sort, ascending=ascending,
unique=unique, preceding=preceding,
following=following, data_type=data_type)
def cummedian(expr, sort=None, ascending=True, unique=False,
preceding=None, following=None):
"""
Calculate cumulative median of a sequence expression.
:param expr: expression for calculation
:param sort: name of the sort column
:param ascending: whether to sort in ascending order
:param unique: whether to eliminate duplicate entries
:param preceding: the start point of a window
:param following: the end point of a window
:return: calculated column
"""
data_type = _stats_type(expr)
return _cumulative_op(expr, CumMedian, sort=sort, ascending=ascending,
unique=unique, preceding=preceding,
following=following, data_type=data_type)
def cumcount(expr, sort=None, ascending=True, unique=False,
preceding=None, following=None):
"""
Calculate cumulative count of a sequence expression.
:param expr: expression for calculation
:param sort: name of the sort column
:param ascending: whether to sort in ascending order
:param unique: whether to eliminate duplicate entries
:param preceding: the start point of a window
:param following: the end point of a window
:return: calculated column
"""
data_type = types.int64
return _cumulative_op(expr, CumCount, sort=sort, ascending=ascending,
unique=unique, preceding=preceding,
following=following, data_type=data_type)
def cumstd(expr, sort=None, ascending=True, unique=False,
preceding=None, following=None):
"""
Calculate cumulative standard deviation of a sequence expression.
:param expr: expression for calculation
:param sort: name of the sort column
:param ascending: whether to sort in ascending order
:param unique: whether to eliminate duplicate entries
:param preceding: the start point of a window
:param following: the end point of a window
:return: calculated column
"""
data_type = _stats_type(expr)
return _cumulative_op(expr, CumStd, sort=sort, ascending=ascending,
unique=unique, preceding=preceding,
following=following, data_type=data_type)
def nth_value(expr, nth, skip_nulls=False, sort=None, ascending=True):
"""
Get nth value of a grouped and sorted expression.
:param expr: expression for calculation
:param nth: integer position
:param skip_nulls: whether to skip null values, False by default
:param sort: name of the sort column
:param ascending: whether to sort in ascending order
:return: calculated column
"""
return _cumulative_op(expr, NthValue, data_type=expr._data_type, sort=sort,
ascending=ascending, _nth=nth, _skip_nulls=skip_nulls)
def _rank_op(expr, op_cls, data_type, sort=None, ascending=True, **kwargs):
if isinstance(expr, SequenceGroupBy):
grouped = expr._input
sort = sort or object_getattr(grouped, '_sorted_fields', None) or expr.name
elif not isinstance(expr, BaseGroupBy):
raise NotImplementedError
else:
grouped = expr
if not isinstance(grouped, SortedGroupBy) and sort is not None:
grouped = grouped.sort(sort, ascending=ascending)
if isinstance(expr, SequenceGroupBy):
expr = grouped[expr.name]
if not isinstance(grouped, SortedGroupBy):
raise ExpressionError('`sort` arg is required for the rank operation')
return op_cls(_input=grouped._input, _partition_by=grouped._by,
_order_by=object_getattr(grouped, '_sorted_fields', None),
_data_type=data_type, **kwargs)
def rank(expr, sort=None, ascending=True):
"""
Calculate rank of a sequence expression.
:param expr: expression for calculation
:param sort: name of the sort column
:param ascending: whether to sort in ascending order
:return: calculated column
"""
return _rank_op(expr, Rank, types.int64, sort=sort, ascending=ascending)
def dense_rank(expr, sort=None, ascending=True):
"""
Calculate dense rank of a sequence expression.
:param expr: expression for calculation
:param sort: name of the sort column
:param ascending: whether to sort in ascending order
:return: calculated column
"""
return _rank_op(expr, DenseRank, types.int64, sort=sort, ascending=ascending)
def percent_rank(expr, sort=None, ascending=True):
"""
Calculate percentage rank of a sequence expression.
:param expr: expression for calculation
:param sort: name of the sort column
:param ascending: whether to sort in ascending order
:return: calculated column
"""
return _rank_op(expr, PercentRank, types.float64, sort=sort, ascending=ascending)
def row_number(expr, sort=None, ascending=True):
"""
Calculate row number of a sequence expression.
:param expr: expression for calculation
:param sort: name of the sort column
:param ascending: whether to sort in ascending order
:return: calculated column
"""
return _rank_op(expr, RowNumber, types.int64, sort=sort, ascending=ascending)
def qcut(expr, bins, labels=False, sort=None, ascending=True):
"""
Get quantile-based bin indices of every element of a grouped and sorted expression.
The indices of bins start from 0. If cuts are not of equal sizes, extra items will
be appended into the first group.
:param expr: expression for calculation
:param bins: number of bins
:param sort: name of the sort column
:param ascending: whether to sort in ascending order
:return: calculated column
"""
if labels is None or labels:
raise NotImplementedError('Showing bins or customizing labels not supported')
return _rank_op(expr, QCut, types.int64, sort=sort, ascending=ascending,
_bins=bins)
def cume_dist(expr, sort=None, ascending=True):
"""
Calculate cumulative ratio of a sequence expression.
:param expr: expression for calculation
:param sort: name of the sort column
:param ascending: whether to sort in ascending order
:return: calculated column
"""
return _rank_op(expr, CumeDist, types.float64, sort=sort, ascending=ascending)
def _shift_op(expr, op_cls, offset, default=None, sort=None, ascending=True):
if isinstance(expr, SequenceGroupBy):
if object_getattr(expr._input, '_sorted_fields', None) is None:
sort = sort or expr.name
groupby = expr._input.sort(sort, ascending=ascending)
expr = groupby[expr._name]
collection = expr._input._input
column = collection[expr._name]
return op_cls(_input=column, _partition_by=expr._input._by,
_order_by=object_getattr(expr._input, '_sorted_fields', None),
_offset=offset, _default=default,
_name=expr._name, _data_type=expr._data_type)
def lag(expr, offset, default=None, sort=None, ascending=True):
"""
Get value in the row ``offset`` rows prior to the current row.
:param offset: the offset value
:param default: default value for the function, when there are no rows satisfying the offset
:param expr: expression for calculation
:param sort: name of the sort column
:param ascending: whether to sort in ascending order
:return: calculated column
"""
return _shift_op(expr, Lag, offset, default=default,
sort=sort, ascending=ascending)
def lead(expr, offset, default=None, sort=None, ascending=True):
"""
Get value in the row ``offset`` rows after to the current row.
:param offset: the offset value
:param default: default value for the function, when there are no rows satisfying the offset
:param expr: expression for calculation
:param sort: name of the sort column
:param ascending: whether to sort in ascending order
:return: calculated column
"""
return _shift_op(expr, Lead, offset, default=default,
sort=sort, ascending=ascending)
_number_window_methods = dict(
cumsum=cumsum,
cummean=cummean,
cummedian=cummedian,
cumstd=cumstd,
)
_window_methods = dict(
cummax=cummax,
cummin=cummin,
cumcount=cumcount,
lag=lag,
lead=lead,
)
_groupby_methods = dict(
rank=rank,
min_rank=rank,
dense_rank=dense_rank,
percent_rank=percent_rank,
row_number=row_number,
qcut=qcut,
nth_value=nth_value,
cume_dist=cume_dist,
)
number_windows = [globals().get(repr(t).capitalize() + SequenceGroupBy.__name__)
for t in types.number_types()]
for number_window in number_windows:
utils.add_method(number_window, _number_window_methods)
utils.add_method(SequenceGroupBy, _window_methods)
StringSequenceGroupBy.cumsum = cumsum # FIXME: should we support string?
BooleanSequenceGroupBy.cumsum = cumsum
utils.add_method(BaseGroupBy, _groupby_methods)
utils.add_method(SequenceGroupBy, _groupby_methods)