odps/df/expr/collections.py (1,019 lines of code) (raw):

#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 1999-2022 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import absolute_import import json import re import time import uuid from collections import namedtuple, OrderedDict from .expressions import * from .dynamic import DynamicCollectionExpr from .arithmetic import Negate from . import utils from ..types import validate_data_type, string, DynamicSchema from ..utils import FunctionWrapper, output from ...models import TableSchema from ...compat import six, lkeys, lvalues, reduce from ...utils import str_to_kv class SortedColumn(SequenceExpr): """ Notice: we do not inherit from the Column """ __slots__ = '_ascending', _args = '_input', @property def input(self): return self._input def accept(self, visitor): return visitor.visit_sort_column(self) class SortedExpr(Expr): def _init(self, *args, **kwargs): super(SortedExpr, self)._init(*args, **kwargs) if isinstance(self._ascending, bool): self._ascending = tuple([self._ascending] * len(self._sorted_fields)) if len(self._sorted_fields) != len(self._ascending): raise ValueError('Length of ascending must be 1 or as many as the length of fields') sorted_fields = list() for i, field in enumerate(self._sorted_fields): if isinstance(field, Negate) and isinstance(field._input, SequenceExpr): field = field._input ascending = list(self._ascending) ascending[i] = False self._ascending = tuple(ascending) attr_values = dict((attr, getattr(field, attr, None)) for attr in utils.get_attrs(field)) attr_values['_ascending'] = False sorted_fields.append(SortedColumn(**attr_values)) elif isinstance(field, SortedColumn): sorted_fields.append(field) elif isinstance(field, SequenceExpr): column = SortedColumn(_input=field, _name=field.name, _source_name=field.source_name, _data_type=field._data_type, _source_data_type=field._source_data_type, _ascending=self._ascending[i]) sorted_fields.append(column) elif isinstance(field, Scalar): column = SortedColumn(_input=field, _name=field.name, _source_name=field.source_name, _data_type=field._value_type, _source_data_type=field._source_value_type, _ascending=self._ascending[i]) sorted_fields.append(column) else: from .groupby import SequenceGroupBy if isinstance(field, SequenceGroupBy): field = field.name assert isinstance(field, six.string_types) sorted_fields.append( SortedColumn(self._input[field], _name=field, _data_type=self._input._schema[field].type, _ascending=self._ascending[i])) self._sorted_fields = sorted_fields class SortedCollectionExpr(SortedExpr, CollectionExpr): __slots__ = '_ascending', _args = '_input', '_sorted_fields' node_name = 'SortBy' def iter_args(self): for it in zip(['collection', 'keys'], self.args): yield it @property def input(self): return self._input def rebuild(self): rebuilt = super(SortedCollectionExpr, self).rebuild() rebuilt._schema = self.input.schema return rebuilt def accept(self, visitor): visitor.visit_sort(self) def sort_values(expr, by, ascending=True): """ Sort the collection by values. `sort` is an alias name for `sort_values` :param expr: collection :param by: the sequence or sequences to sort :param ascending: Sort ascending vs. descending. Sepecify list for multiple sort orders. If this is a list of bools, must match the length of the by :return: Sorted collection :Example: >>> df.sort_values(['name', 'id']) # 1 >>> df.sort(['name', 'id'], ascending=False) # 2 >>> df.sort(['name', 'id'], ascending=[False, True]) # 3 >>> df.sort([-df.name, df.id]) # 4, equal to #3 """ if not isinstance(by, list): by = [by, ] by = [it(expr) if inspect.isfunction(it) else it for it in by] return SortedCollectionExpr(expr, _sorted_fields=by, _ascending=ascending, _schema=expr._schema) class DistinctCollectionExpr(CollectionExpr): __slots__ = '_all', _args = '_input', '_unique_fields' node_name = 'Distinct' def _init(self, *args, **kwargs): super(DistinctCollectionExpr, self)._init(*args, **kwargs) if self._unique_fields: self._unique_fields = list(self._input._get_field(field) for field in self._unique_fields) if not hasattr(self, '_schema'): names = [field.name for field in self._unique_fields] types = [field._data_type for field in self._unique_fields] self._schema = TableSchema.from_lists(names, types) else: self._unique_fields = list(self._input._get_field(field) for field in self._input._schema.names) self._schema = self._input._schema def iter_args(self): for it in zip(['collection', 'distinct'], self.args): yield it @property def input(self): return self._input def rebuild(self): return self._input.distinct(self._unique_fields if not self._all else []) def accept(self, visitor): return visitor.visit_distinct(self) def distinct(expr, on=None, *ons): """ Get collection with duplicate rows removed, optionally only considering certain columns :param expr: collection :param on: sequence or sequences :return: dinstinct collection :Example: >>> df.distinct(['name', 'id']) >>> df['name', 'id'].distinct() """ on = on or list() if not isinstance(on, list): on = [on, ] on = on + list(ons) on = [it(expr) if inspect.isfunction(it) else it for it in on] return DistinctCollectionExpr(expr, _unique_fields=on, _all=(len(on) == 0)) def unique(expr): if isinstance(expr, SequenceExpr): collection = next(it for it in expr.traverse(top_down=True, unique=True) if isinstance(it, CollectionExpr)) return collection.distinct(expr)[expr.name] class SampledCollectionExpr(CollectionExpr): _args = '_input', '_n', '_frac', '_parts', '_i', '_sampled_fields', '_replace', \ '_weights', '_strata', '_random_state' node_name = 'Sample' def _init(self, *args, **kwargs): for attr in self._args[1:]: self._init_attr(attr, None) super(SampledCollectionExpr, self)._init(*args, **kwargs) if not isinstance(self._n, dict): self._n = self._scalar(self._n) else: self._n = self._scalar(json.dumps(self._n)) if not isinstance(self._frac, dict): self._frac = self._scalar(self._frac) else: self._frac = self._scalar(json.dumps(self._frac)) self._parts = self._scalar(self._parts) self._i = self._scalar(self._i) self._replace = self._scalar(self._replace) self._weights = self._scalar(self._weights) self._strata = self._scalar(self._strata) self._random_state = self._scalar(self._random_state) def _scalar(self, val): if val is None: return if isinstance(val, Scalar): return val if isinstance(val, tuple): return tuple(self._scalar(it) for it in val) else: return Scalar(_value=val) @property def input(self): return self._input def rebuild(self): rebuilt = super(SampledCollectionExpr, self).rebuild() rebuilt._schema = self.input.schema return rebuilt def accept(self, visitor): return visitor.visit_sample(self) def __df_sample(expr, parts=None, columns=None, i=None, n=None, frac=None, replace=False, weights=None, strata=None, random_state=None): if columns: columns = expr.select(columns)._fields return SampledCollectionExpr(_input=expr, _parts=parts, _i=i, _sampled_fields=columns, _n=n, _frac=frac, _weights=weights, _strata=strata, _random_state=random_state, _replace=replace, _schema=expr.schema) def sample(expr, parts=None, columns=None, i=None, n=None, frac=None, replace=False, weights=None, strata=None, random_state=None): """ Sample collection. :param expr: collection :param parts: how many parts to hash :param columns: the columns to sample :param i: the part to sample out, can be a list of parts, must be from 0 to parts-1 :param n: how many rows to sample. If `strata` is specified, `n` should be a dict with values in the strata column as dictionary keys and corresponding sample size as values :param frac: how many fraction to sample. If `strata` is specified, `n` should be a dict with values in the strata column as dictionary keys and corresponding sample weight as values :param replace: whether to perform replace sampling :param weights: the column name of weights :param strata: the name of strata column :param random_state: the random seed when performing sampling :return: collection Note that n, frac, replace, weights, strata and random_state can only be used under Pandas DataFrames or XFlow. :Example: Sampling with parts: >>> df.sample(parts=1) >>> df.sample(parts=5, i=0) >>> df.sample(parts=10, columns=['name']) Sampling with fraction or weights, replacement option can be specified: >>> df.sample(n=100) >>> df.sample(frac=0.1) >>> df.sample(frac=0.1, replace=True) Sampling with weight column: >>> df.sample(n=100, weights='weight_col') >>> df.sample(n=100, weights='weight_col', replace=True) Stratified sampling. Note that currently we do not support stratified sampling with replacement. >>> df.sample(strata='category', frac={'Iris Setosa': 0.5, 'Iris Versicolour': 0.4}) """ if isinstance(expr, CollectionExpr): if n is None and frac is None and parts is None: raise ExpressionError('Either n or frac or parts should be provided') if i is not None and parts is None: raise ExpressionError('`parts` arg is required when `i` arg is specified') if len([arg for arg in (n, frac, parts) if arg is not None]) > 1: raise ExpressionError('You cannot specify `n` or `frac` or `parts` at the same time') if strata is None and n is not None and frac is not None: # strata can specify different types of strategies on different columns raise ExpressionError('You cannot specify `n` and `frac` at the same time.') if weights is not None and strata is not None: raise ExpressionError('You cannot specify `weights` and `strata` at the same time.') if strata is not None: if frac is not None and not isinstance(frac, (six.string_types, dict)): raise ExpressionError('`frac` should be a k-v string or a dictionary object.') if isinstance(frac, six.string_types): frac = str_to_kv(frac, float) if n is not None and not isinstance(n, (six.string_types, dict)): raise ExpressionError('`n` should be a k-v string or a dictionary object.') if isinstance(n, six.string_types): n = str_to_kv(n, int) for val in six.itervalues(frac or dict()): if val < 0 or val > 1: raise ExpressionError('Values in `frac` must be between 0 and 1') if n is not None and frac is not None: collides = set(six.iterkeys(n)).intersection(set(six.iterkeys(frac))) if collides: raise ExpressionError('Values in `frac` and `n` collides with each other.') else: if frac is not None and (not isinstance(frac, (six.integer_types, float)) or frac < 0 or frac > 1): raise ExpressionError('`frac` must be between 0 and 1') if parts is not None: if i is None: i = (0, ) elif isinstance(i, list): i = tuple(i) elif not isinstance(i, tuple): i = (i, ) for it in i: if it >= parts or it < 0: raise ExpressionError('`i` should be positive numbers that less than `parts`') elif not options.df.use_xflow_sample and not replace and weights is None and strata is None: if frac is not None and frac < 0.01: raise ValueError( "Does not support sampling less than 1%. Try sampling by count or " "set options.df.use_xflow_sample to True." ) elif hasattr(expr, '_xflow_sample'): return expr._xflow_sample(n=n, frac=frac, replace=replace, weights=weights, strata=strata, random_state=random_state) return expr.__sample(parts=parts, columns=columns, i=i, n=n, frac=frac, replace=replace, weights=weights, strata=strata, random_state=random_state) class RowAppliedCollectionExpr(CollectionExpr): __slots__ = '_func', '_func_args', '_func_kwargs', '_close_func', \ '_resources', '_raw_inputs', '_lateral_view', '_keep_nulls', \ '_cu_request' _args = '_input', '_fields', '_collection_resources' node_name = 'Apply' def _init(self, *args, **kwargs): self._init_attr('_raw_inputs', None) self._init_attr('_lateral_view', False) self._init_attr('_cu_request', None) super(RowAppliedCollectionExpr, self)._init(*args, **kwargs) @property def input(self): return self._input @property def fields(self): return self._fields @property def input_types(self): return [f.dtype for f in self._fields] @property def raw_input_types(self): if self._raw_inputs: return [f.dtype for f in self._raw_inputs] return self.input_types @property def func(self): return self._func @func.setter def func(self, f): self._func = f def accept(self, visitor): return visitor.visit_apply_collection(self) def _apply_horizontal(expr, func, names=None, types=None, resources=None, collection_resources=None, keep_nulls=False, cu_request=None, args=(), **kwargs): if isinstance(func, FunctionWrapper): names = names or func.output_names types = types or func.output_types func = func._func if names is not None: if isinstance(names, list): names = tuple(names) elif isinstance(names, six.string_types): names = (names,) if names is None: raise ValueError( 'Apply on rows to provide multiple values should provide all column names, ' 'for instance, df.apply(func, axis=1, names=["A", "B"], types=["float", "float"]). ' 'See https://pyodps.readthedocs.io/zh_CN/latest/df-sort-distinct-apply.html#dfudtfapp ' 'for more information.' ) tps = (string,) * len(names) if types is None else tuple(validate_data_type(t) for t in types) schema = TableSchema.from_lists(names, tps) collection_resources = collection_resources or \ utils.get_collection_resources(resources) return RowAppliedCollectionExpr(_input=expr, _func=func, _func_args=args, _func_kwargs=kwargs, _schema=schema, _fields=[expr[n] for n in expr.schema.names], _keep_nulls=keep_nulls, _resources=resources, _collection_resources=collection_resources, _cu_request=cu_request) def apply(expr, func, axis=0, names=None, types=None, reduce=False, resources=None, keep_nulls=False, cu_request=None, args=(), **kwargs): """ Apply a function to a row when axis=1 or column when axis=0. :param expr: :param func: function to apply :param axis: row when axis=1 else column :param names: output names :param types: output types :param reduce: if True will return a sequence else return a collection :param resources: resources to read :param keep_nulls: if True, keep rows producing empty results, only work in lateral views :param args: args for function :param kwargs: kwargs for function :return: :Example: Apply a function to a row: >>> from odps.df import output >>> >>> @output(['iris_add', 'iris_sub'], ['float', 'float']) >>> def handle(row): >>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth >>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth >>> >>> iris.apply(handle, axis=1).count() Apply a function to a column: >>> class Agg(object): >>> >>> def buffer(self): >>> return [0.0, 0] >>> >>> def __call__(self, buffer, val): >>> buffer[0] += val >>> buffer[1] += 1 >>> >>> def merge(self, buffer, pbuffer): >>> buffer[0] += pbuffer[0] >>> buffer[1] += pbuffer[1] >>> >>> def getvalue(self, buffer): >>> if buffer[1] == 0: >>> return 0.0 >>> return buffer[0] / buffer[1] >>> >>> iris.exclude('name').apply(Agg) """ if types is None and "rtype" in kwargs: types = kwargs.pop("rtype") if not isinstance(expr, CollectionExpr): return if isinstance(func, FunctionWrapper): names = names or func.output_names types = types or func.output_types func = func._func if axis == 0: types = types or expr.schema.types types = [validate_data_type(t) for t in types] fields = [expr[n].agg(func, rtype=t, resources=resources) for n, t in zip(expr.schema.names, types)] if names: fields = [f.rename(n) for f, n in zip(fields, names)] else: names = [f.name for f in fields] return Summary(_input=expr, _fields=fields, _schema=TableSchema.from_lists(names, types)) else: collection_resources = utils.get_collection_resources(resources) if types is not None: if isinstance(types, list): types = tuple(types) elif isinstance(types, six.string_types): types = (types,) types = tuple(validate_data_type(t) for t in types) if reduce: from .element import MappedExpr from ..backends.context import context if names is not None and len(names) > 1: raise ValueError('When reduce, at most one name can be specified') name = names[0] if names is not None else None if not types and kwargs.get('rtype', None) is not None: types = [kwargs.pop('rtype')] tp = types[0] if types is not None else (utils.get_annotation_rtype(func) or string) if not context.is_cached(expr) and (hasattr(expr, '_fields') and expr._fields is not None): inputs = [e.copy_tree(stop_cond=lambda x: any(i is expr.input for i in x.children())) for e in expr._fields] else: inputs = [expr[n] for n in expr.schema.names] return MappedExpr(_func=func, _func_args=args, _func_kwargs=kwargs, _name=name, _data_type=tp, _inputs=inputs, _multiple=True, _resources=resources, _cu_request=cu_request, _collection_resources=collection_resources) else: return _apply_horizontal(expr, func, names=names, types=types, resources=resources, collection_resources=collection_resources, keep_nulls=keep_nulls, cu_request=cu_request, args=args, **kwargs) class ReshuffledCollectionExpr(CollectionExpr): _args = '_input', '_by', '_sort_fields' node_name = 'Reshuffle' def _init(self, *args, **kwargs): from .groupby import BaseGroupBy, SortedGroupBy self._init_attr('_sort_fields', None) super(ReshuffledCollectionExpr, self)._init(*args, **kwargs) if isinstance(self._input, BaseGroupBy): if isinstance(self._input, SortedGroupBy): self._sort_fields = self._input._sorted_fields self._by = self._input._by self._input = self._input._input @property def fields(self): return self._by + (self._sort_fields or list()) @property def input(self): return self._input def iter_args(self): arg_names = ['collection', 'bys', 'sort'] for it in zip(arg_names, self.args): yield it def accept(self, visitor): return visitor.visit_reshuffle(self) def reshuffle(expr, by=None, sort=None, ascending=True): """ Reshuffle data. :param expr: :param by: the sequence or scalar to shuffle by. RandomScalar as default :param sort: the sequence or scalar to sort. :param ascending: True if ascending else False :return: collection """ by = by or RandomScalar() grouped = expr.groupby(by) if sort: grouped = grouped.sort_values(sort, ascending=ascending) return ReshuffledCollectionExpr(_input=grouped, _schema=expr._schema) def map_reduce(expr, mapper=None, reducer=None, group=None, sort=None, ascending=True, combiner=None, combiner_buffer_size=1024, mapper_output_names=None, mapper_output_types=None, mapper_resources=None, mapper_cu=None, reducer_output_names=None, reducer_output_types=None, reducer_resources=None, reducer_cu=None): """ MapReduce API, mapper or reducer should be provided. :param expr: :param mapper: mapper function or class :param reducer: reducer function or class :param group: the keys to group after mapper :param sort: the keys to sort after mapper :param ascending: True if ascending else False :param combiner: combiner function or class, combiner's output should be equal to mapper :param combiner_buffer_size: combiner's buffer size, 1024 as default :param mapper_output_names: mapper's output names :param mapper_output_types: mapper's output types :param mapper_resources: the resources for mapper :param reducer_output_names: reducer's output names :param reducer_output_types: reducer's output types :param reducer_resources: the resources for reducer :return: :Example: >>> from odps.df import output >>> >>> @output(['word', 'cnt'], ['string', 'int']) >>> def mapper(row): >>> for word in row[0].split(): >>> yield word.lower(), 1 >>> >>> @output(['word', 'cnt'], ['string', 'int']) >>> def reducer(keys): >>> cnt = [0] >>> def h(row, done): # done illustrates that all the rows of the keys are processed >>> cnt[0] += row.cnt >>> if done: >>> yield keys.word, cnt[0] >>> return h >>> >>> words_df.map_reduce(mapper, reducer, group='word') """ def _adjust_partial(fun): if isinstance(fun, functools.partial) and isinstance(fun.func, FunctionWrapper): wrapped_fun = fun.func partial_fun = functools.partial(wrapped_fun._func, *fun.args, **fun.keywords) ret_fun = FunctionWrapper(partial_fun) ret_fun.output_names = wrapped_fun.output_names ret_fun.output_types = wrapped_fun.output_types return ret_fun else: return fun def conv(l): if l is None: return if isinstance(l, tuple): l = list(l) elif not isinstance(l, list): l = [l, ] return l def gen_name(): return 'pyodps_field_%s' % str(uuid.uuid4()).replace('-', '_') def _gen_actual_reducer(reducer, group): class ActualReducer(object): def __init__(self, resources=None): self._func = reducer self._curr = None self._prev_rows = None self._key_named_tuple = namedtuple('NamedKeys', group) self._resources = resources self._f = None def _is_generator_function(self, f): if inspect.isgeneratorfunction(f): return True elif hasattr(f, '__call__') and inspect.isgeneratorfunction(f.__call__): return True return False def __call__(self, row): key = tuple(getattr(row, n) for n in group) k = self._key_named_tuple(*key) if self._prev_rows is not None: key_consumed = self._curr != key if self._is_generator_function(self._f): for it in self._f(self._prev_rows, key_consumed): yield it else: res = self._f(self._prev_rows, key_consumed) if res: yield res self._prev_rows = row if self._curr is None or self._curr != key: self._curr = key if self._resources and self._f is None: self._func = self._func(self._resources) self._f = self._func(k) def close(self): if self._prev_rows and self._curr: if self._is_generator_function(self._f): for it in self._f(self._prev_rows, True): yield it else: res = self._f(self._prev_rows, True) if res: yield res self._prev_rows = None return ActualReducer def _gen_combined_mapper(mapper, combiner, names, group, sort, ascending, buffer_size, mapper_resources=None): mapper = mapper if not isinstance(mapper, FunctionWrapper) else mapper._func sort_indexes = [names.index(s) for s in sort] if isinstance(ascending, bool): ascending = [ascending] * len(sort) class CombinedMapper(object): def __init__(self, resources=None): if mapper_resources: self.f = mapper(resources) elif inspect.isclass(mapper): self.f = mapper() else: self.f = mapper self.buffer = list() if inspect.isfunction(self.f): self.is_generator = inspect.isgeneratorfunction(self.f) else: self.is_generator = inspect.isgeneratorfunction(self.f.__call__) def _cmp_to_key(self, cmp): """Convert a cmp= function into a key= function""" class K(object): def __init__(self, obj): self.obj = obj def __lt__(self, other): return cmp(self.obj, other.obj) < 0 def __gt__(self, other): return cmp(self.obj, other.obj) > 0 def __eq__(self, other): return cmp(self.obj, other.obj) == 0 def __le__(self, other): return cmp(self.obj, other.obj) <= 0 def __ge__(self, other): return cmp(self.obj, other.obj) >= 0 def __ne__(self, other): return cmp(self.obj, other.obj) != 0 return K def _combine(self): def cmp(x, y): for asc, sort_idx in zip(ascending, sort_indexes): indict = 1 if asc else -1 if x[sort_idx] > y[sort_idx]: return indict * 1 elif x[sort_idx] < y[sort_idx]: return indict * -1 else: continue return 0 self.buffer.sort(key=self._cmp_to_key(cmp)) ActualCombiner = _gen_actual_reducer(combiner, group) ac = ActualCombiner() named_row = namedtuple('NamedRow', names) for r in self.buffer: row = named_row(*r) for l in ac(row): yield l for l in ac.close(): yield l self.buffer = [] def _handle_output_line(self, line): if len(self.buffer) >= buffer_size: for l in self._combine(): yield l self.buffer.append(line) def __call__(self, row): if self.is_generator: for it in self.f(row): for l in self._handle_output_line(it): yield l else: for l in self._handle_output_line(self.f(row)): yield l def close(self): if len(self.buffer) > 0: for l in self._combine(): yield l return CombinedMapper mapper = _adjust_partial(mapper) reducer = _adjust_partial(reducer) combiner = _adjust_partial(combiner) if isinstance(mapper, FunctionWrapper): mapper_output_names = mapper_output_names or mapper.output_names mapper_output_types = mapper_output_types or mapper.output_types mapper_output_names = conv(mapper_output_names) mapper_output_types = conv(mapper_output_types) if mapper_output_types is not None and mapper_output_names is None: mapper_output_names = [gen_name() for _ in range(len(mapper_output_types))] if mapper is None and mapper_output_names is None: mapper_output_names = expr.schema.names group = conv(group) or mapper_output_names sort = sort or tuple() sort = list(OrderedDict.fromkeys(group + conv(sort))) if len(sort) > len(group): ascending = [ascending, ] * (len(sort) - len(group)) \ if isinstance(ascending, bool) else list(ascending) if len(ascending) != len(sort): ascending = [True] * len(group) + ascending if not set(group + sort).issubset(mapper_output_names): raise ValueError('group and sort have to be the column names of mapper') if mapper is None: if mapper_output_names and mapper_output_names != expr.schema.names: raise ExpressionError( 'Null mapper cannot have mapper output names: %s' % mapper_output_names) if mapper_output_types and mapper_output_types != expr.schema.types: raise ExpressionError( 'Null mapper cannot have mapper output types: %s' % mapper_output_types) mapped = expr if combiner is not None: raise ValueError('Combiner is not null when mapper is null') else: if combiner is not None: if isinstance(combiner, FunctionWrapper): if combiner.output_names and \ combiner.output_names != mapper_output_names: raise ExpressionError( 'Combiner must have the same output names with mapper') if combiner.output_types and \ combiner.output_types != mapper_output_types: raise ExpressionError( 'Combiner must have the same output types with mapper') combiner = combiner._func mapper = _gen_combined_mapper(mapper, combiner, mapper_output_names, group, sort, ascending, combiner_buffer_size, mapper_resources=mapper_resources) mapped = expr.apply(mapper, axis=1, names=mapper_output_names, types=mapper_output_types, resources=mapper_resources, cu_request=mapper_cu) clustered = mapped.groupby(group).sort(sort, ascending=ascending) if isinstance(reducer, FunctionWrapper): reducer_output_names = reducer_output_names or reducer.output_names reducer_output_types = reducer_output_types or reducer.output_types reducer = reducer._func if reducer is None: if reducer_output_names and reducer_output_names != mapped.schema.names: raise ExpressionError( 'Null reducer cannot have reducer output names %s' % reducer_output_names) if reducer_output_types and reducer_output_types != mapped.schema.types: raise ExpressionError( 'Null reducer cannot have reducer output types %s' % reducer_output_types) return mapped ActualReducer = _gen_actual_reducer(reducer, group) return clustered.apply(ActualReducer, resources=reducer_resources, names=reducer_output_names, types=reducer_output_types, cu_request=reducer_cu) class PivotCollectionExpr(DynamicCollectionExpr): _args = '_input', '_group', '_columns', '_values' node_name = 'Pivot' def _init(self, *args, **kwargs): self._init_attr('_group', None) self._init_attr('_columns', None) self._init_attr('_values', None) super(PivotCollectionExpr, self)._init(*args, **kwargs) if not hasattr(self, '_schema'): self._schema = DynamicSchema.from_lists( [f.name for f in self._group], [f.dtype for f in self._group] ) def iter_args(self): for it in zip(['collection', 'group', 'columns', 'values'], self.args): yield it @property def input(self): return self._input def accept(self, visitor): return visitor.visit_pivot(self) def pivot(expr, rows, columns, values=None): """ Produce ‘pivot’ table based on 3 columns of this DataFrame. Uses unique values from rows / columns and fills with values. :param expr: collection :param rows: use to make new collection's grouped rows :param columns: use to make new collection's columns :param values: values to use for populating new collection's values :return: collection :Example: >>> df.pivot(rows='id', columns='category') >>> df.pivot(rows='id', columns='category', values='sale') >>> df.pivot(rows=['id', 'id2'], columns='category', values='sale') """ rows = [expr._get_field(r) for r in utils.to_list(rows)] columns = [expr._get_field(c) for c in utils.to_list(columns)] if values: values = utils.to_list(values) else: names = set(c.name for c in rows + columns) values = [n for n in expr.schema.names if n not in names] if not values: raise ValueError('No values found for pivot') values = [expr._get_field(v) for v in values] if len(columns) > 1: raise ValueError('More than one `columns` are not supported yet') return PivotCollectionExpr(_input=expr, _group=rows, _columns=columns, _values=values) def melt(expr, id_vars=None, value_vars=None, var_name='variable', value_name='value', ignore_nan=False): """ “Unpivots” a DataFrame from wide format to long format, optionally leaving identifier variables set. This function is useful to massage a DataFrame into a format where one or more columns are identifier variables (id_vars), while all other columns, considered measured variables (value_vars), are “unpivoted” to the row axis, leaving just two non-identifier columns, ‘variable’ and ‘value’. :param expr: collection :param id_vars: column(s) to use as identifier variables. :param value_vars: column(s) to unpivot. If not specified, uses all columns that are not set as id_vars. :param var_name: name to use for the ‘variable’ column. If None it uses frame.columns.name or ‘variable’. :param value_name: name to use for the ‘value’ column. :param ignore_nan: whether to ignore NaN values in data. :return: collection :Example: >>> df.melt(id_vars='id', value_vars=['col1', 'col2']) >>> df.melt(id_vars=['id', 'id2'], value_vars=['col1', 'col2'], var_name='variable') """ id_vars = id_vars or [] id_vars = [expr._get_field(r) for r in utils.to_list(id_vars)] if not value_vars: id_names = set([c.name for c in id_vars]) value_vars = [expr._get_field(c) for c in expr.schema.names if c not in id_names] else: value_vars = [expr._get_field(c) for c in value_vars] col_type = utils.highest_precedence_data_type(*[c.dtype for c in value_vars]) col_names = [c.name for c in value_vars] id_names = [r.name for r in id_vars] names = id_names + [var_name, value_name] dtypes = [r.dtype for r in id_vars] + [types.string, col_type] @output(names, dtypes) def mapper(row): for cn in col_names: col_value = getattr(row, cn) if ignore_nan and col_value is None: continue vals = [getattr(row, rn) for rn in id_names] yield tuple(vals + [cn, col_value]) return expr.map_reduce(mapper) class PivotTableCollectionExpr(CollectionExpr): __slots__ = '_agg_func', '_agg_func_names' _args = '_input', '_group', '_columns', '_values', '_fill_value' node_name = 'PivotTable' def _init(self, *args, **kwargs): for arg in self._args: self._init_attr(arg, None) super(PivotTableCollectionExpr, self)._init(*args, **kwargs) for attr in ('_fill_value', ): val = getattr(self, attr, None) if val is not None and not isinstance(val, Scalar): setattr(self, attr, Scalar(_value=val)) @property def input(self): return self._input @property def fill_value(self): if self._fill_value: return self._fill_value.value @property def margins(self): return self._margins.value @property def margins_name(self): return self._margins_name.value def accept(self, visitor): return visitor.visit_pivot(self) def pivot_table(expr, values=None, rows=None, columns=None, aggfunc='mean', fill_value=None): """ Create a spreadsheet-style pivot table as a DataFrame. :param expr: collection :param values (optional): column to aggregate :param rows: rows to group :param columns: keys to group by on the pivot table column :param aggfunc: aggregate function or functions :param fill_value (optional): value to replace missing value with, default None :return: collection :Example: >>> df A B C D 0 foo one small 1 1 foo one large 2 2 foo one large 2 3 foo two small 3 4 foo two small 3 5 bar one large 4 6 bar one small 5 7 bar two small 6 8 bar two large 7 >>> table = df.pivot_table(values='D', rows=['A', 'B'], columns='C', aggfunc='sum') >>> table A B large_D_sum small_D_sum 0 bar one 4.0 5.0 1 bar two 7.0 6.0 2 foo one 4.0 1.0 3 foo two NaN 6.0 """ def get_names(iters): return [r if isinstance(r, six.string_types) else r.name for r in iters] def get_aggfunc_name(f): if isinstance(f, six.string_types): if '(' in f: f = re.sub(r' *\( *', '_', f) f = re.sub(r' *[+\-\*/,] *', '_', f) f = re.sub(r' *\) *', '', f) f = f.replace('.', '_') return f if isinstance(f, FunctionWrapper): return f.output_names[0] return 'aggregation' if not rows: raise ValueError('No group keys passed') rows = utils.to_list(rows) rows_names = get_names(rows) rows = [expr._get_field(r) for r in rows] if isinstance(aggfunc, dict): agg_func_names = lkeys(aggfunc) aggfunc = lvalues(aggfunc) else: aggfunc = utils.to_list(aggfunc) agg_func_names = [get_aggfunc_name(af) for af in aggfunc] if not columns: if values is None: values = [n for n in expr.schema.names if n not in rows_names] else: values = utils.to_list(values) values = [expr._get_field(v) for v in values] names = rows_names types = [r.dtype for r in rows] for func, func_name in zip(aggfunc, agg_func_names): for value in values: if isinstance(func, six.string_types): seq = value.eval(func, rewrite=False) if isinstance(seq, ReprWrapper): seq = seq() else: seq = value.agg(func) seq = seq.rename('{0}_{1}'.format(value.name, func_name)) names.append(seq.name) types.append(seq.dtype) schema = TableSchema.from_lists(names, types) return PivotTableCollectionExpr(_input=expr, _group=rows, _values=values, _fill_value=fill_value, _schema=schema, _agg_func=aggfunc, _agg_func_names=agg_func_names) else: columns = [expr._get_field(c) for c in utils.to_list(columns)] if values: values = utils.to_list(values) else: names = set(c.name for c in rows + columns) values = [n for n in expr.schema.names if n not in names] if not values: raise ValueError('No values found for pivot_table') values = [expr._get_field(v) for v in values] if len(columns) > 1: raise ValueError('More than one `columns` are not supported yet') schema = DynamicSchema.from_lists(rows_names, [r.dtype for r in rows]) base_tp = PivotTableCollectionExpr tp = type(base_tp.__name__, (DynamicCollectionExpr, base_tp), dict()) return tp(_input=expr, _group=rows, _values=values, _columns=columns, _agg_func=aggfunc, _fill_value=fill_value, _schema=schema, _agg_func_names=agg_func_names) def _scale_values(expr, columns, agg_fun, scale_fun, preserve=False, suffix='_scaled', group=None): from ..types import Float, Integer time_suffix = str(int(time.time())) if group is not None: group = utils.to_list(group) group = [expr._get_field(c).name if isinstance(c, Column) else c for c in group] if columns is None: if group is None: columns = expr.schema.names else: columns = [n for n in expr.schema.names if n not in group] else: columns = utils.to_list(columns) columns = [expr._get_field(v) for v in columns] numerical_cols = [col.name for col in columns if isinstance(col.data_type, (Float, Integer))] agg_cols = [] for col_name in numerical_cols: agg_cols.extend(agg_fun(expr, col_name)) if group is None: # make a fake constant column to join extra_col = 'idx_col_' + time_suffix join_cols = [extra_col] stats_df = expr.__getitem__([Scalar(1).rename(extra_col)] + agg_cols) mapped = expr[expr, Scalar(1).rename(extra_col)] else: extra_col = None join_cols = group stats_df = expr.groupby(join_cols).agg(*agg_cols) mapped = expr joined = mapped.join(stats_df, on=join_cols, mapjoin=True) if extra_col is not None: joined = joined.exclude(extra_col) if preserve: norm_cols = [dt.name for dt in expr.dtypes] norm_cols.extend([scale_fun(joined, dt.name).rename(dt.name + suffix) for dt in expr.dtypes if dt.name in numerical_cols]) else: norm_cols = [scale_fun(joined, dt.name).rename(dt.name) if dt.name in numerical_cols else getattr(joined, dt.name) for dt in expr.dtypes] return joined.__getitem__(norm_cols) def min_max_scale(expr, columns=None, feature_range=(0, 1), preserve=False, suffix='_scaled', group=None): """ Resize a data frame by max / min values, i.e., (X - min(X)) / (max(X) - min(X)) :param DataFrame expr: input DataFrame :param feature_range: the target range to resize the value into, i.e., v * (b - a) + a :param bool preserve: determine whether input data should be kept. If True, scaled input data will be appended to the data frame with `suffix` :param columns: columns names to resize. If set to None, float or int-typed columns will be normalized if the column is not specified as a group column. :param group: determine scale groups. Scaling will be done in each group separately. :param str suffix: column suffix to be appended to the scaled columns. :return: resized data frame :rtype: DataFrame """ time_suffix = str(int(time.time())) def calc_agg(expr, col): return [ getattr(expr, col).min().rename(col + '_min_' + time_suffix), getattr(expr, col).max().rename(col + '_max_' + time_suffix), ] def do_scale(expr, col): f_min, f_max = feature_range r = getattr(expr, col + '_max_' + time_suffix) - getattr(expr, col + '_min_' + time_suffix) scaled = (r == 0).ifelse(Scalar(0), (getattr(expr, col) - getattr(expr, col + '_min_' + time_suffix)) / r) return scaled * (f_max - f_min) + f_min return _scale_values(expr, columns, calc_agg, do_scale, preserve=preserve, suffix=suffix, group=group) def std_scale(expr, columns=None, with_means=True, with_std=True, preserve=False, suffix='_scaled', group=None): """ Resize a data frame by mean and standard error. :param DataFrame expr: Input DataFrame :param bool with_means: Determine whether the output will be subtracted by means :param bool with_std: Determine whether the output will be divided by standard deviations :param bool preserve: Determine whether input data should be kept. If True, scaled input data will be appended to the data frame with `suffix` :param columns: Columns names to resize. If set to None, float or int-typed columns will be normalized if the column is not specified as a group column. :param group: determine scale groups. Scaling will be done in each group separately. :param str suffix: column suffix to be appended to the scaled columns. :return: resized data frame :rtype: DataFrame """ time_suffix = str(int(time.time())) def calc_agg(expr, col): return [ getattr(expr, col).mean().rename(col + '_mean_' + time_suffix), getattr(expr, col).std(ddof=0).rename(col + '_std_' + time_suffix), ] def do_scale(expr, col): c = getattr(expr, col) mean_expr = getattr(expr, col + '_mean_' + time_suffix) if with_means: c = c - mean_expr mean_expr = Scalar(0) if with_std: std_expr = getattr(expr, col + '_std_' + time_suffix) c = (std_expr == 0).ifelse(mean_expr, c / getattr(expr, col + '_std_' + time_suffix)) return c return _scale_values(expr, columns, calc_agg, do_scale, preserve=preserve, suffix=suffix, group=group) class ExtractKVCollectionExpr(DynamicCollectionExpr): __slots__ = '_column_type', _args = '_input', '_columns', '_intact', '_kv_delimiter', '_item_delimiter', '_default' node_name = 'ExtractKV' def _init(self, *args, **kwargs): from .element import _scalar for attr in self._args[1:]: self._init_attr(attr, None) super(ExtractKVCollectionExpr, self)._init(*args, **kwargs) self._kv_delimiter = _scalar(self._kv_delimiter) self._item_delimiter = _scalar(self._item_delimiter) self._default = _scalar(self._default) @property def input(self): return self._input def accept(self, visitor): visitor.visit_extract_kv(self) def extract_kv(expr, columns=None, kv_delim=':', item_delim=',', dtype='float', fill_value=None): """ Extract values in key-value represented columns into standalone columns. New column names will be the name of the key-value column followed by an underscore and the key. :param DataFrame expr: input DataFrame :param columns: the key-value columns to be extracted. :param str kv_delim: delimiter between key and value. :param str item_delim: delimiter between key-value pairs. :param str dtype: type of value columns to generate. :param fill_value: default value for missing key-value pairs. :return: extracted data frame :rtype: DataFrame :Example: >>> df name kv 0 name1 k1=1.0,k2=3.0,k5=10.0 1 name2 k2=3.0,k3=5.1 2 name3 k1=7.1,k7=8.2 3 name4 k2=1.2,k3=1.5 4 name5 k2=1.0,k9=1.1 >>> table = df.extract_kv(columns=['A', 'B'], kv_delim='=') >>> table name kv_k1 kv_k2 kv_k3 kv_k5 kv_k7 kv_k9 0 name1 1.0 3.0 Nan 10.0 Nan Nan 1 name2 Nan 3.0 5.1 Nan Nan Nan 2 name3 7.1 Nan Nan Nan 8.2 Nan 3 name4 Nan 1.2 1.5 Nan Nan Nan 4 name5 Nan 1.0 Nan Nan Nan 1.1 """ if columns is None: columns = [expr._get_field(c) for c in expr.schema.names] intact_cols = [] else: columns = [expr._get_field(c) for c in utils.to_list(columns)] name_set = set([c.name for c in columns]) intact_cols = [expr._get_field(c) for c in expr.schema.names if c not in name_set] column_type = types.validate_data_type(dtype) if any(not isinstance(c.dtype, types.String) for c in columns): raise ExpressionError('Key-value columns must be strings.') schema = DynamicSchema.from_lists([c.name for c in intact_cols], [c.dtype for c in intact_cols]) return ExtractKVCollectionExpr(_input=expr, _columns=columns, _intact=intact_cols, _schema=schema, _column_type=column_type, _default=fill_value, _kv_delimiter=kv_delim, _item_delimiter=item_delim) def to_kv(expr, columns=None, kv_delim=':', item_delim=',', kv_name='kv_col'): """ Merge values in specified columns into a key-value represented column. :param DataFrame expr: input DataFrame :param columns: the columns to be merged. :param str kv_delim: delimiter between key and value. :param str item_delim: delimiter between key-value pairs. :param str kv_col: name of the new key-value column :return: converted data frame :rtype: DataFrame :Example: >>> df name k1 k2 k3 k5 k7 k9 0 name1 1.0 3.0 Nan 10.0 Nan Nan 1 name2 Nan 3.0 5.1 Nan Nan Nan 2 name3 7.1 Nan Nan Nan 8.2 Nan 3 name4 Nan 1.2 1.5 Nan Nan Nan 4 name5 Nan 1.0 Nan Nan Nan 1.1 >>> table = df.to_kv(columns=['A', 'B'], kv_delim='=') >>> table name kv_col 0 name1 k1=1.0,k2=3.0,k5=10.0 1 name2 k2=3.0,k3=5.1 2 name3 k1=7.1,k7=8.2 3 name4 k2=1.2,k3=1.5 4 name5 k2=1.0,k9=1.1 """ if columns is None: columns = [expr._get_field(c) for c in expr.schema.names] intact_cols = [] else: columns = [expr._get_field(c) for c in utils.to_list(columns)] name_set = set([c.name for c in columns]) intact_cols = [expr._get_field(c) for c in expr.schema.names if c not in name_set] mapped_cols = [c.isnull().ifelse(Scalar(''), c.name + kv_delim + c.astype('string')) for c in columns] reduced_col = reduce(lambda a, b: (b == '').ifelse(a, (a == '').ifelse(b, a + item_delim + b)), mapped_cols) return expr.__getitem__(intact_cols + [reduced_col.rename(kv_name)]) def dropna(expr, how='any', thresh=None, subset=None): """ Return object with labels on given axis omitted where alternately any or all of the data are missing :param DataFrame expr: input DataFrame :param how: can be ‘any’ or ‘all’. If 'any' is specified any NA values are present, drop that label. If 'all' is specified and all values are NA, drop that label. :param thresh: require that many non-NA values :param subset: Labels along other axis to consider, e.g. if you are dropping rows these would be a list of columns to include :return: DataFrame """ if subset is None: subset = [expr._get_field(c) for c in expr.schema.names] else: subset = [expr._get_field(c) for c in utils.to_list(subset)] if not subset: raise ValueError('Illegal subset is provided.') if thresh is None: thresh = len(subset) if how == 'any' else 1 sum_exprs = reduce(operator.add, (s.notna().ifelse(1, 0) for s in subset)) return expr.filter(sum_exprs >= thresh) def fillna(expr, value=None, method=None, subset=None): """ Fill NA/NaN values using the specified method :param DataFrame expr: input DataFrame :param method: can be ‘backfill’, ‘bfill’, ‘pad’, ‘ffill’ or None :param value: value to fill into :param subset: Labels along other axis to consider. :return: DataFrame """ col_dict = OrderedDict([(c, expr._get_field(c)) for c in expr.schema.names]) if subset is None: sel_col_names = expr.schema.names else: # when c is in expr._fields, _get_field may do substitution which will cause error subset = (c.copy() if isinstance(c, Expr) else c for c in utils.to_list(subset)) sel_col_names = [expr._get_field(c).name for c in subset] if method is not None and value is not None: raise ValueError('The argument `method` is not compatible with `value`.') if method is None and value is None: raise ValueError('You should supply at least one argument in `method` and `value`.') if method is not None and method not in ('backfill', 'bfill', 'pad', 'ffill'): raise ValueError('Method value %s is illegal.' % str(method)) if method in ('backfill', 'bfill'): sel_cols = list(reversed(sel_col_names)) else: sel_cols = sel_col_names if method is None: for n in sel_col_names: e = col_dict[n] col_dict[n] = e.isna().ifelse(value, e).rename(n) return expr.select(list(col_dict.values())) else: names = list(col_dict.keys()) typs = list(c.dtype.name for c in col_dict.values()) @output(names, typs) def mapper(row): last_valid = None update_dict = dict() import math try: import numpy as np except ImportError: np = None def isnan(v): if v is None: return True if np is not None: try: return np.isnan(v) except TypeError: pass try: return math.isnan(v) except TypeError: return False for n in sel_cols: old_val = getattr(row, n) if old_val is None or isnan(old_val): if last_valid is not None: update_dict[n] = last_valid else: last_valid = old_val yield row.replace(**update_dict) return expr.map_reduce(mapper) def ffill(expr, subset=None): """ Fill NA/NaN values with the forward method. Equivalent to fillna(method='ffill'). :param DataFrame expr: input DataFrame. :param subset: Labels along other axis to consider. :return: DataFrame """ return expr.fillna(method='ffill', subset=subset) def bfill(expr, subset=None): """ Fill NA/NaN values with the backward method. Equivalent to fillna(method='bfill'). :param DataFrame expr: input DataFrame. :param subset: Labels along other axis to consider. :return: DataFrame """ return expr.fillna(method='bfill', subset=subset) class AppendIDCollectionExpr(CollectionExpr): _args = '_input', '_id_col' node_name = 'AppendID' def _init(self, *args, **kwargs): from .element import _scalar for attr in self._args[1:]: self._init_attr(attr, None) super(AppendIDCollectionExpr, self)._init(*args, **kwargs) self._validate() self._id_col = _scalar(self._id_col) self._schema = TableSchema.from_lists( self._input.schema.names + [self._id_col.value], self._input.schema.types + [types.int64], ) def _validate(self): if self._id_col in self._input.schema: raise ExpressionError('ID column already exists in current data frame.') @property def input(self): return self._input def accept(self, visitor): return visitor.visit_append_id(self) def _append_id(expr, id_col='append_id'): return AppendIDCollectionExpr(_input=expr, _id_col=id_col) def append_id(expr, id_col='append_id'): """ Append an ID column to current column to form a new DataFrame. :param str id_col: name of appended ID field. :return: DataFrame with ID field :rtype: DataFrame """ if hasattr(expr, '_xflow_append_id'): return expr._xflow_append_id(id_col) else: return _append_id(expr, id_col) class SplitCollectionExpr(CollectionExpr): _args = '_input', '_frac', '_seed', '_split_id' node_name = 'Split' def _init(self, *args, **kwargs): from .element import _scalar for attr in self._args[1:]: self._init_attr(attr, None) super(SplitCollectionExpr, self)._init(*args, **kwargs) self._frac = _scalar(self._frac) self._seed = _scalar(self._seed, types.int32) self._split_id = _scalar(self._split_id, types.int32) self._schema = self._input.schema @property def input(self): return self._input def accept(self, visitor): return visitor.visit_split(self) def _split(expr, frac, seed=None): seed = seed or int(time.time()) return ( SplitCollectionExpr(_input=expr, _frac=frac, _seed=seed, _split_id=0), SplitCollectionExpr(_input=expr, _frac=frac, _seed=seed, _split_id=1), ) def split(expr, frac, seed=None): """ Split the current column into two column objects with certain ratio. :param float frac: Split ratio :return: two split DataFrame objects """ if hasattr(expr, '_xflow_split'): return expr._xflow_split(frac, seed=seed) else: return _split(expr, frac, seed=seed) def applymap(expr, func, rtype=None, resources=None, columns=None, excludes=None, args=(), **kwargs): """ Call func on each element of this collection. :param func: lambda, function, :class:`odps.models.Function`, or str which is the name of :class:`odps.models.Funtion` :param rtype: if not provided, will be the dtype of this sequence :param columns: columns to apply this function on :param excludes: columns to skip when applying the function :return: a new collection :Example: >>> df.applymap(lambda x: x + 1) """ if columns is not None and excludes is not None: raise ValueError('`columns` and `excludes` cannot be provided at the same time.') if not columns: excludes = excludes or [] if isinstance(excludes, six.string_types): excludes = [excludes] excludes = set([c if isinstance(c, six.string_types) else c.name for c in excludes]) columns = set([c for c in expr.schema.names if c not in excludes]) else: if isinstance(columns, six.string_types): columns = [columns] columns = set([c if isinstance(c, six.string_types) else c.name for c in columns]) mapping = [expr[c] if c not in columns else expr[c].map(func, rtype=rtype, resources=resources, args=args, **kwargs) for c in expr.schema.names] return expr.select(*mapping) _collection_methods = dict( sort_values=sort_values, sort=sort_values, distinct=distinct, apply=apply, reshuffle=reshuffle, map_reduce=map_reduce, sample=sample, __sample=__df_sample, pivot=pivot, melt=melt, pivot_table=pivot_table, extract_kv=extract_kv, to_kv=to_kv, dropna=dropna, fillna=fillna, ffill=ffill, bfill=bfill, min_max_scale=min_max_scale, std_scale=std_scale, _append_id=_append_id, append_id=append_id, _split=_split, split=split, applymap=applymap, ) _sequence_methods = dict( unique=unique ) utils.add_method(CollectionExpr, _collection_methods) utils.add_method(SequenceExpr, _sequence_methods)