odps/df/expr/reduction.py (530 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
from .expressions import *
from .groupby import *
from . import utils
from .. import types
from ..utils import FunctionWrapper
class SequenceReduction(Scalar):
__slots__ = '_unique',
_args = '_input',
@property
def node_name(self):
return self.__class__.__name__
@property
def source_name(self):
if self._source_name:
return self._source_name
return self._input.name
@property
def name(self):
if self._name:
return self._name
source_name = self.source_name
if source_name:
return '%s_%s' % (source_name, self.node_name.lower())
def rename(self, new_name):
if new_name == self._name:
return self
attr_values = dict((attr, getattr(self, attr, None))
for attr in utils.get_attrs(self))
attr_values['_name'] = new_name
new_reduction = type(self)(**attr_values)
return new_reduction
def output_type(self):
return repr(self._value_type)
def to_grouped_reduction(self, grouped):
collection = next(n for n in self.input.traverse(top_down=True, unique=True)
if isinstance(n, CollectionExpr))
if collection is not grouped._input:
raise ExpressionError('Aggregation should be applied to %s, %s instead' % (
grouped._input, collection))
cls_name = 'Grouped%s' % self.__class__.__name__
clz = globals()[cls_name]
kwargs = dict((arg, getattr(self, arg, None)) for arg in utils.get_attrs(self)
if arg != '_cache_args')
kwargs['_data_type'] = kwargs.pop('_value_type')
if '_source_value_type' in kwargs:
kwargs['_source_data_type'] = kwargs.pop('_source_value_type')
kwargs['_grouped'] = grouped
del kwargs['_value']
return clz(**kwargs)
@property
def input(self):
return self._input
def accept(self, visitor):
return visitor.visit_reduction(self)
class GroupedSequenceReduction(SequenceExpr):
__slots__ = '_grouped', '_unique'
_args = '_input', '_by'
_extra_args = '_grouped',
def _init(self, *args, **kwargs):
self._init_attr('_grouped', None)
self._init_attr('_by', None)
super(GroupedSequenceReduction, self)._init(*args, **kwargs)
assert self._grouped is not None
if self._by is None:
self._by = self._grouped._by
@property
def source_name(self):
if self._source_name:
return self._source_name
return self._input.name
@property
def name(self):
if self._name:
return self._name
source_name = self.source_name
if source_name:
return '%s_%s' % (source_name, self.node_name.lower())
def _repr(self):
expr = self._grouped.agg(self)[self.name]
return expr._repr()
@property
def input(self):
return self._input
def accept(self, visitor):
return visitor.visit_reduction(self)
class Min(SequenceReduction):
__slots__ = ()
class GroupedMin(GroupedSequenceReduction):
node_name = 'min'
class Max(SequenceReduction):
__slots__ = ()
class GroupedMax(GroupedSequenceReduction):
node_name = 'max'
class Count(SequenceReduction):
__slots__ = ()
class GroupedCount(GroupedSequenceReduction):
node_name = 'count'
class Sum(SequenceReduction):
__slots__ = ()
class GroupedSum(GroupedSequenceReduction):
node_name = 'sum'
class Var(SequenceReduction):
__slots__ = '_ddof',
class GroupedVar(GroupedSequenceReduction):
__slots__ = '_ddof',
node_name = 'var'
class Std(SequenceReduction):
__slots__ = '_ddof',
class GroupedStd(GroupedSequenceReduction):
__slots__ = '_ddof',
node_name = 'std'
class Mean(SequenceReduction):
__slots__ = ()
class GroupedMean(GroupedSequenceReduction):
node_name = 'mean'
class Quantile(SequenceReduction):
__slots__ = '_prob',
class GroupedQuantile(GroupedSequenceReduction):
__slots__ = '_prob',
node_name = 'quantile'
class Moment(SequenceReduction):
__slots__ = '_order', '_center'
class GroupedMoment(GroupedSequenceReduction):
__slots__ = '_order', '_center'
node_name = 'moment'
class Skewness(SequenceReduction):
__slots__ = ()
node_name = 'skew'
class GroupedSkewness(GroupedSequenceReduction):
node_name = 'skew'
class Kurtosis(SequenceReduction):
__slots__ = ()
class GroupedKurtosis(GroupedSequenceReduction):
node_name = 'kurtosis'
class Median(SequenceReduction):
__slots__ = ()
class GroupedMedian(GroupedSequenceReduction):
node_name = 'median'
class Any(SequenceReduction):
__slots__ = ()
class GroupedAny(GroupedSequenceReduction):
node_name = 'any'
class All(SequenceReduction):
__slots__ = ()
class GroupedAll(GroupedSequenceReduction):
node_name = 'all'
class NUnique(SequenceReduction):
_args = '_inputs',
@property
def source_name(self):
if self._source_name:
return self._source_name
if len(self._inputs) == 1:
return self._inputs[0].name
@property
def name(self):
if self._name:
return self._name
source_name = self.source_name
if source_name:
return '%s_%s' % (source_name, self.node_name.lower())
@property
def input(self):
return self._inputs[0]
class GroupedNUnique(GroupedSequenceReduction):
node_name = 'nunique'
_args = '_inputs', '_by'
@property
def source_name(self):
if self._source_name:
return self._source_name
if len(self._inputs) == 1:
return self._inputs[0].name
@property
def name(self):
if self._name:
return self._name
source_name = self.source_name
if source_name:
return '%s_%s' % (source_name, self.node_name.lower())
@property
def input(self):
return self._inputs[0]
class Cat(SequenceReduction):
_args = '_input', '_by', '_sep', '_na_rep'
def _init(self, *args, **kwargs):
self._init_attr('_sep', None)
self._init_attr('_na_rep', None)
super(Cat, self)._init(*args, **kwargs)
if self._sep is not None and not isinstance(self._sep, Scalar):
self._sep = Scalar(_value=self._sep)
if self._na_rep is not None and not isinstance(self._na_rep, Scalar):
self._na_rep = Scalar(_value=self._na_rep)
class GroupedCat(GroupedSequenceReduction):
_args = '_input', '_by', '_sep', '_na_rep'
node_name = 'cat'
def _init(self, *args, **kwargs):
self._init_attr('_sep', None)
self._init_attr('_na_rep', None)
super(GroupedCat, self)._init(*args, **kwargs)
if self._sep is not None and not isinstance(self._sep, Scalar):
self._sep = Scalar(_value=self._sep)
if self._na_rep is not None and not isinstance(self._na_rep, Scalar):
self._na_rep = Scalar(_value=self._na_rep)
class ToList(SequenceReduction):
__slots__ = ()
class GroupedToList(GroupedSequenceReduction):
__slots__ = ()
node_name = 'tolist'
class Aggregation(SequenceReduction):
__slots__ = '_aggregator', '_func_args', '_func_kwargs', '_resources', '_raw_inputs'
_args = '_inputs', '_collection_resources', '_by', '_cu_request'
node_name = 'Aggregation'
def _init(self, *args, **kwargs):
self._init_attr('_raw_inputs', None)
self._init_attr('_cu_request', None)
super(Aggregation, self)._init(*args, **kwargs)
@property
def source_name(self):
if self._source_name:
return self._source_name
if len(self._inputs) == 1:
return self._inputs[0].name
@property
def raw_inputs(self):
return self._raw_inputs or self._inputs
@property
def input(self):
return self._inputs[0]
@property
def func(self):
return self._aggregator
@func.setter
def func(self, f):
self._aggregator = f
@property
def input_types(self):
return [f.dtype for f in self._inputs]
@property
def raw_input_types(self):
if self._raw_inputs:
return [f.dtype for f in self._raw_inputs]
return self.input_types
def accept(self, visitor):
visitor.visit_user_defined_aggregator(self)
class GroupedAggregation(GroupedSequenceReduction):
__slots__ = '_aggregator', '_func_args', '_func_kwargs', '_resources', '_raw_inputs'
_args = '_inputs', '_collection_resources', '_by', '_cu_request'
node_name = 'Aggregation'
def _init(self, *args, **kwargs):
self._init_attr('_raw_inputs', None)
self._init_attr('_cu_request', None)
super(GroupedAggregation, self)._init(*args, **kwargs)
@property
def source_name(self):
if self._source_name:
return self._source_name
if len(self._inputs) == 1:
return self._inputs[0].name
@property
def raw_inputs(self):
return self._raw_inputs or self._inputs
@property
def input(self):
return self._inputs[0]
@property
def func(self):
return self._aggregator
@func.setter
def func(self, f):
self._aggregator = f
@property
def input_types(self):
return [f.dtype for f in self._inputs]
@property
def raw_input_types(self):
if self._raw_inputs:
return [f.dtype for f in self._raw_inputs]
return self.input_types
def accept(self, visitor):
visitor.visit_user_defined_aggregator(self)
def _extract_unique_input(expr):
from .collections import DistinctCollectionExpr
if isinstance(expr, Column):
if isinstance(expr.input, DistinctCollectionExpr) and len(expr.input._unique_fields) == 1:
return expr.input._unique_fields[0]
else:
return None
def _reduction(expr, output_cls, output_type=None, **kw):
grouped_output_cls = globals()['Grouped%s' % output_cls.__name__]
method_name = output_cls.__name__.lower()
if isinstance(expr, CollectionExpr):
columns = []
for name in expr.schema.names:
column = expr[name]
if hasattr(column, method_name):
columns.append(getattr(column, method_name)(**kw))
return expr[columns]
elif isinstance(expr, GroupBy):
aggs = []
# TODO: dynamic may need rebuilt
for name in expr._to_agg.names:
agg = expr[name]
if hasattr(agg, method_name):
aggs.append(getattr(agg, method_name)(**kw))
return expr.agg(aggs)
if output_type is None:
output_type = expr._data_type
if isinstance(expr, SequenceExpr):
unique_input = _extract_unique_input(expr)
if unique_input is not None:
kw['_unique'] = True
return _reduction(unique_input, output_cls, output_type=output_type, **kw)
else:
return output_cls(_value_type=output_type, _input=expr, **kw)
elif isinstance(expr, SequenceGroupBy):
return grouped_output_cls(_data_type=output_type, _input=expr.to_column(),
_grouped=expr.input, **kw)
def min_(expr):
"""
Min value
:param expr:
:return:
"""
return _reduction(expr, Min)
def max_(expr):
"""
Max value
:param expr:
:return:
"""
return _reduction(expr, Max)
def count(expr):
"""
Value counts
:param expr:
:return:
"""
if isinstance(expr, SequenceExpr):
unique_input = _extract_unique_input(expr)
if unique_input:
return nunique(unique_input).rename('count')
else:
return Count(_value_type=types.int64, _input=expr)
elif isinstance(expr, SequenceGroupBy):
return GroupedCount(_data_type=types.int64, _input=expr.to_column(),
_grouped=expr.input)
elif isinstance(expr, CollectionExpr):
return Count(_value_type=types.int64, _input=expr).rename('count')
elif isinstance(expr, GroupBy):
return GroupedCount(_data_type=types.int64, _input=expr.input,
_grouped=expr).rename('count')
def _stats_type(expr):
if isinstance(expr, (SequenceExpr, SequenceGroupBy)):
if expr._data_type == types.decimal:
output_type = types.decimal
else:
output_type = types.float64
return output_type
def var(expr, **kw):
"""
Variance
:param expr:
:param ddof: degree of freedom
:param kw:
:return:
"""
ddof = kw.get('ddof', kw.get('_ddof', 1))
output_type = _stats_type(expr)
return _reduction(expr, Var, output_type, _ddof=ddof)
def sum_(expr):
"""
Sum value
:param expr:
:return:
"""
output_type = None
if isinstance(expr, (SequenceExpr, SequenceGroupBy)):
if expr._data_type == types.boolean:
output_type = types.int64
else:
output_type = expr._data_type
return _reduction(expr, Sum, output_type)
def std(expr, **kw):
"""
Standard deviation.
:param expr:
:param kw:
:return:
"""
ddof = kw.get('ddof', kw.get('_ddof', 1))
output_type = _stats_type(expr)
return _reduction(expr, Std, output_type, _ddof=ddof)
def mean(expr):
"""
Arithmetic mean.
:param expr:
:return:
"""
output_type = _stats_type(expr)
return _reduction(expr, Mean, output_type)
def median(expr):
"""
Median value.
:param expr:
:return:
"""
output_type = _stats_type(expr)
return _reduction(expr, Median, output_type)
def quantile(expr, prob=None, **kw):
"""
Percentile value.
:param expr:
:param prob: probability or list of probabilities, in [0, 1]
:return:
"""
prob = kw.get('_prob', prob)
output_type = _stats_type(expr)
if isinstance(prob, (list, set)) and not isinstance(expr, GroupBy):
output_type = types.List(output_type)
return _reduction(expr, Quantile, output_type, _prob=prob)
def any_(expr):
"""
Any is True.
:param expr:
:return:
"""
output_type = types.boolean
return _reduction(expr, Any, output_type)
def all_(expr):
"""
All is True.
:param expr:
:return:
"""
output_type = types.boolean
return _reduction(expr, All, output_type)
def nunique(expr):
"""
The distinct count.
:param expr:
:return:
"""
output_type = types.int64
if isinstance(expr, SequenceExpr):
return NUnique(_value_type=output_type, _inputs=[expr])
elif isinstance(expr, SequenceGroupBy):
return GroupedNUnique(_data_type=output_type, _inputs=[expr.to_column()], _grouped=expr.input)
elif isinstance(expr, CollectionExpr):
unique_input = _extract_unique_input(expr)
if unique_input:
return nunique(unique_input)
else:
return NUnique(_value_type=types.int64, _inputs=expr._project_fields)
elif isinstance(expr, GroupBy):
if expr._to_agg:
inputs = expr.input[expr._to_agg.names]._project_fields
else:
inputs = expr.input._project_fields
return GroupedNUnique(_data_type=types.int64, _inputs=inputs,
_grouped=expr)
def _cat(expr, sep=None, na_rep=None):
output_type = types.string
return _reduction(expr, Cat, output_type, _sep=sep, _na_rep=na_rep)
def cat(expr, others=None, sep=None, na_rep=None):
"""
Concatenate strings in sequence with given separator
:param expr:
:param others: other sequences
:param sep: string or None, default None
:param na_rep: string or None default None, if None, NA in the sequence are ignored
:return:
"""
if others is not None:
from .strings import _cat as cat_str
return cat_str(expr, others, sep=sep, na_rep=na_rep)
return _cat(expr, sep=sep, na_rep=na_rep)
def moment(expr, order, central=False):
"""
Calculate the n-th order moment of the sequence
:param expr:
:param order: moment order, must be an integer
:param central: if central moments are to be computed.
:return:
"""
if not isinstance(order, six.integer_types):
raise ValueError('Only integer-ordered moments are supported.')
if order < 0:
raise ValueError('Only non-negative orders are supported.')
output_type = _stats_type(expr)
return _reduction(expr, Moment, output_type, _order=order, _center=central)
def skew(expr):
"""
Calculate skewness of the sequence
:param expr:
:return:
"""
output_type = _stats_type(expr)
return _reduction(expr, Skewness, output_type)
def kurtosis(expr):
"""
Calculate kurtosis of the sequence
:param expr:
:return:
"""
output_type = _stats_type(expr)
return _reduction(expr, Kurtosis, output_type)
def aggregate(exprs, aggregator, rtype=None, resources=None, unique=False,
cu_request=None, args=(), **kwargs):
name = None
if isinstance(aggregator, FunctionWrapper):
if aggregator.output_names:
if len(aggregator.output_names) > 1:
raise ValueError('Aggregate column has more than one name')
name = aggregator.output_names[0]
if aggregator.output_types:
rtype = rtype or aggregator.output_types[0]
aggregator = aggregator._func
if rtype is None:
rtype = utils.get_annotation_rtype(aggregator.getvalue)
if not isinstance(exprs, Iterable):
exprs = [exprs, ]
if rtype is None:
rtype = exprs[0].dtype
if rtype is None:
raise ValueError('rtype should be specified')
output_type = types.validate_data_type(rtype)
collection = None
if len(exprs) > 0:
for expr in exprs:
coll = next(it for it in expr.traverse(top_down=True, unique=True)
if isinstance(it, CollectionExpr))
if collection is None:
collection = coll
elif collection is not coll:
raise ValueError('The sequences to aggregate should come from the same collection')
collection_resources = utils.get_collection_resources(resources)
if all(isinstance(expr, SequenceGroupBy) for expr in exprs):
inputs = [expr.to_column() for expr in exprs]
return GroupedAggregation(_inputs=inputs, _aggregator=aggregator,
_data_type=output_type, _name=name,
_func_args=args, _func_kwargs=kwargs,
_cu_request=cu_request, _resources=resources,
_collection_resources=collection_resources,
_grouped=exprs[0].input)
else:
if not unique and len(exprs) == 1:
unique_input = _extract_unique_input(exprs[0])
if unique_input:
unique = True
exprs = [unique_input]
return Aggregation(_inputs=exprs, _aggregator=aggregator,
_value_type=output_type, _name=name,
_func_args=args, _func_kwargs=kwargs,
_cu_request=cu_request, _resources=resources,
_collection_resources=collection_resources,
_unique=unique)
def agg(*args, **kwargs):
return aggregate(*args, **kwargs)
def tolist(expr, **kwargs):
"""
Pack all data in the sequence into a list
:param expr:
:param unique: make every elements in the sequence to be unique
:return:
"""
unique = kwargs.get('unique', kwargs.get('_unique', False))
output_type = None
if isinstance(expr, (SequenceExpr, SequenceGroupBy)):
output_type = types.List(expr._data_type)
return _reduction(expr, ToList, output_type, _unique=unique)
_number_sequence_methods = dict(
var=var,
std=std,
mean=mean,
moment=moment,
skew=skew,
kurtosis=kurtosis,
kurt=kurtosis,
median=median,
sum=sum_,
quantile=quantile,
)
_sequence_methods = dict(
min=min_,
max=max_,
count=count,
size=count,
nunique=nunique,
tolist=tolist,
)
number_sequences = [globals().get(repr(t).capitalize() + SequenceExpr.__name__)
for t in types.number_types()]
for number_sequence in number_sequences:
utils.add_method(number_sequence, _number_sequence_methods)
utils.add_method(SequenceExpr, _sequence_methods)
StringSequenceExpr.sum = sum_
StringSequenceExpr.cat = cat
BooleanSequenceExpr.sum = sum_
BooleanSequenceExpr.any = any_
BooleanSequenceExpr.all = all_
SequenceExpr.aggregate = aggregate
SequenceExpr.agg = aggregate
number_sequences_groupby = [globals().get(repr(t).capitalize() + SequenceGroupBy.__name__)
for t in types.number_types()]
for number_sequence_groupby in number_sequences_groupby:
utils.add_method(number_sequence_groupby, _number_sequence_methods)
utils.add_method(SequenceGroupBy, _sequence_methods)
StringSequenceGroupBy.sum = sum_
StringSequenceGroupBy.cat = _cat
BooleanSequenceGroupBy.sum = sum_
BooleanSequenceGroupBy.any = any_
BooleanSequenceGroupBy.all = all_
SequenceGroupBy.aggregate = aggregate
SequenceGroupBy.agg = aggregate
# add method to collection expression
utils.add_method(CollectionExpr, _number_sequence_methods)
utils.add_method(CollectionExpr, _sequence_methods)
CollectionExpr.size = count
CollectionExpr.any = any_
CollectionExpr.all = all_
# add method to GroupBy
utils.add_method(GroupBy, _number_sequence_methods)
utils.add_method(GroupBy, _sequence_methods)
GroupBy.size = count
GroupBy.any = any_
GroupBy.all = all_