odps/df/expr/collections.py (1,019 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
import json
import re
import time
import uuid
from collections import namedtuple, OrderedDict
from .expressions import *
from .dynamic import DynamicCollectionExpr
from .arithmetic import Negate
from . import utils
from ..types import validate_data_type, string, DynamicSchema
from ..utils import FunctionWrapper, output
from ...models import TableSchema
from ...compat import six, lkeys, lvalues, reduce
from ...utils import str_to_kv
class SortedColumn(SequenceExpr):
"""
Notice: we do not inherit from the Column
"""
__slots__ = '_ascending',
_args = '_input',
@property
def input(self):
return self._input
def accept(self, visitor):
return visitor.visit_sort_column(self)
class SortedExpr(Expr):
def _init(self, *args, **kwargs):
super(SortedExpr, self)._init(*args, **kwargs)
if isinstance(self._ascending, bool):
self._ascending = tuple([self._ascending] * len(self._sorted_fields))
if len(self._sorted_fields) != len(self._ascending):
raise ValueError('Length of ascending must be 1 or as many as the length of fields')
sorted_fields = list()
for i, field in enumerate(self._sorted_fields):
if isinstance(field, Negate) and isinstance(field._input, SequenceExpr):
field = field._input
ascending = list(self._ascending)
ascending[i] = False
self._ascending = tuple(ascending)
attr_values = dict((attr, getattr(field, attr, None)) for attr in utils.get_attrs(field))
attr_values['_ascending'] = False
sorted_fields.append(SortedColumn(**attr_values))
elif isinstance(field, SortedColumn):
sorted_fields.append(field)
elif isinstance(field, SequenceExpr):
column = SortedColumn(_input=field, _name=field.name, _source_name=field.source_name,
_data_type=field._data_type,
_source_data_type=field._source_data_type,
_ascending=self._ascending[i])
sorted_fields.append(column)
elif isinstance(field, Scalar):
column = SortedColumn(_input=field, _name=field.name, _source_name=field.source_name,
_data_type=field._value_type,
_source_data_type=field._source_value_type,
_ascending=self._ascending[i])
sorted_fields.append(column)
else:
from .groupby import SequenceGroupBy
if isinstance(field, SequenceGroupBy):
field = field.name
assert isinstance(field, six.string_types)
sorted_fields.append(
SortedColumn(self._input[field], _name=field,
_data_type=self._input._schema[field].type, _ascending=self._ascending[i]))
self._sorted_fields = sorted_fields
class SortedCollectionExpr(SortedExpr, CollectionExpr):
__slots__ = '_ascending',
_args = '_input', '_sorted_fields'
node_name = 'SortBy'
def iter_args(self):
for it in zip(['collection', 'keys'], self.args):
yield it
@property
def input(self):
return self._input
def rebuild(self):
rebuilt = super(SortedCollectionExpr, self).rebuild()
rebuilt._schema = self.input.schema
return rebuilt
def accept(self, visitor):
visitor.visit_sort(self)
def sort_values(expr, by, ascending=True):
"""
Sort the collection by values. `sort` is an alias name for `sort_values`
:param expr: collection
:param by: the sequence or sequences to sort
:param ascending: Sort ascending vs. descending. Sepecify list for multiple sort orders.
If this is a list of bools, must match the length of the by
:return: Sorted collection
:Example:
>>> df.sort_values(['name', 'id']) # 1
>>> df.sort(['name', 'id'], ascending=False) # 2
>>> df.sort(['name', 'id'], ascending=[False, True]) # 3
>>> df.sort([-df.name, df.id]) # 4, equal to #3
"""
if not isinstance(by, list):
by = [by, ]
by = [it(expr) if inspect.isfunction(it) else it for it in by]
return SortedCollectionExpr(expr, _sorted_fields=by, _ascending=ascending,
_schema=expr._schema)
class DistinctCollectionExpr(CollectionExpr):
__slots__ = '_all',
_args = '_input', '_unique_fields'
node_name = 'Distinct'
def _init(self, *args, **kwargs):
super(DistinctCollectionExpr, self)._init(*args, **kwargs)
if self._unique_fields:
self._unique_fields = list(self._input._get_field(field)
for field in self._unique_fields)
if not hasattr(self, '_schema'):
names = [field.name for field in self._unique_fields]
types = [field._data_type for field in self._unique_fields]
self._schema = TableSchema.from_lists(names, types)
else:
self._unique_fields = list(self._input._get_field(field)
for field in self._input._schema.names)
self._schema = self._input._schema
def iter_args(self):
for it in zip(['collection', 'distinct'], self.args):
yield it
@property
def input(self):
return self._input
def rebuild(self):
return self._input.distinct(self._unique_fields if not self._all else [])
def accept(self, visitor):
return visitor.visit_distinct(self)
def distinct(expr, on=None, *ons):
"""
Get collection with duplicate rows removed, optionally only considering certain columns
:param expr: collection
:param on: sequence or sequences
:return: dinstinct collection
:Example:
>>> df.distinct(['name', 'id'])
>>> df['name', 'id'].distinct()
"""
on = on or list()
if not isinstance(on, list):
on = [on, ]
on = on + list(ons)
on = [it(expr) if inspect.isfunction(it) else it for it in on]
return DistinctCollectionExpr(expr, _unique_fields=on, _all=(len(on) == 0))
def unique(expr):
if isinstance(expr, SequenceExpr):
collection = next(it for it in expr.traverse(top_down=True, unique=True)
if isinstance(it, CollectionExpr))
return collection.distinct(expr)[expr.name]
class SampledCollectionExpr(CollectionExpr):
_args = '_input', '_n', '_frac', '_parts', '_i', '_sampled_fields', '_replace', \
'_weights', '_strata', '_random_state'
node_name = 'Sample'
def _init(self, *args, **kwargs):
for attr in self._args[1:]:
self._init_attr(attr, None)
super(SampledCollectionExpr, self)._init(*args, **kwargs)
if not isinstance(self._n, dict):
self._n = self._scalar(self._n)
else:
self._n = self._scalar(json.dumps(self._n))
if not isinstance(self._frac, dict):
self._frac = self._scalar(self._frac)
else:
self._frac = self._scalar(json.dumps(self._frac))
self._parts = self._scalar(self._parts)
self._i = self._scalar(self._i)
self._replace = self._scalar(self._replace)
self._weights = self._scalar(self._weights)
self._strata = self._scalar(self._strata)
self._random_state = self._scalar(self._random_state)
def _scalar(self, val):
if val is None:
return
if isinstance(val, Scalar):
return val
if isinstance(val, tuple):
return tuple(self._scalar(it) for it in val)
else:
return Scalar(_value=val)
@property
def input(self):
return self._input
def rebuild(self):
rebuilt = super(SampledCollectionExpr, self).rebuild()
rebuilt._schema = self.input.schema
return rebuilt
def accept(self, visitor):
return visitor.visit_sample(self)
def __df_sample(expr, parts=None, columns=None, i=None, n=None, frac=None, replace=False,
weights=None, strata=None, random_state=None):
if columns:
columns = expr.select(columns)._fields
return SampledCollectionExpr(_input=expr, _parts=parts, _i=i, _sampled_fields=columns, _n=n,
_frac=frac, _weights=weights, _strata=strata, _random_state=random_state,
_replace=replace, _schema=expr.schema)
def sample(expr, parts=None, columns=None, i=None, n=None, frac=None, replace=False,
weights=None, strata=None, random_state=None):
"""
Sample collection.
:param expr: collection
:param parts: how many parts to hash
:param columns: the columns to sample
:param i: the part to sample out, can be a list of parts, must be from 0 to parts-1
:param n: how many rows to sample. If `strata` is specified, `n` should be a dict with values in the strata column as dictionary keys and corresponding sample size as values
:param frac: how many fraction to sample. If `strata` is specified, `n` should be a dict with values in the strata column as dictionary keys and corresponding sample weight as values
:param replace: whether to perform replace sampling
:param weights: the column name of weights
:param strata: the name of strata column
:param random_state: the random seed when performing sampling
:return: collection
Note that n, frac, replace, weights, strata and random_state can only be used under Pandas DataFrames or
XFlow.
:Example:
Sampling with parts:
>>> df.sample(parts=1)
>>> df.sample(parts=5, i=0)
>>> df.sample(parts=10, columns=['name'])
Sampling with fraction or weights, replacement option can be specified:
>>> df.sample(n=100)
>>> df.sample(frac=0.1)
>>> df.sample(frac=0.1, replace=True)
Sampling with weight column:
>>> df.sample(n=100, weights='weight_col')
>>> df.sample(n=100, weights='weight_col', replace=True)
Stratified sampling. Note that currently we do not support stratified sampling with replacement.
>>> df.sample(strata='category', frac={'Iris Setosa': 0.5, 'Iris Versicolour': 0.4})
"""
if isinstance(expr, CollectionExpr):
if n is None and frac is None and parts is None:
raise ExpressionError('Either n or frac or parts should be provided')
if i is not None and parts is None:
raise ExpressionError('`parts` arg is required when `i` arg is specified')
if len([arg for arg in (n, frac, parts) if arg is not None]) > 1:
raise ExpressionError('You cannot specify `n` or `frac` or `parts` at the same time')
if strata is None and n is not None and frac is not None:
# strata can specify different types of strategies on different columns
raise ExpressionError('You cannot specify `n` and `frac` at the same time.')
if weights is not None and strata is not None:
raise ExpressionError('You cannot specify `weights` and `strata` at the same time.')
if strata is not None:
if frac is not None and not isinstance(frac, (six.string_types, dict)):
raise ExpressionError('`frac` should be a k-v string or a dictionary object.')
if isinstance(frac, six.string_types):
frac = str_to_kv(frac, float)
if n is not None and not isinstance(n, (six.string_types, dict)):
raise ExpressionError('`n` should be a k-v string or a dictionary object.')
if isinstance(n, six.string_types):
n = str_to_kv(n, int)
for val in six.itervalues(frac or dict()):
if val < 0 or val > 1:
raise ExpressionError('Values in `frac` must be between 0 and 1')
if n is not None and frac is not None:
collides = set(six.iterkeys(n)).intersection(set(six.iterkeys(frac)))
if collides:
raise ExpressionError('Values in `frac` and `n` collides with each other.')
else:
if frac is not None and (not isinstance(frac, (six.integer_types, float)) or frac < 0 or frac > 1):
raise ExpressionError('`frac` must be between 0 and 1')
if parts is not None:
if i is None:
i = (0, )
elif isinstance(i, list):
i = tuple(i)
elif not isinstance(i, tuple):
i = (i, )
for it in i:
if it >= parts or it < 0:
raise ExpressionError('`i` should be positive numbers that less than `parts`')
elif not options.df.use_xflow_sample and not replace and weights is None and strata is None:
if frac is not None and frac < 0.01:
raise ValueError(
"Does not support sampling less than 1%. Try sampling by count or "
"set options.df.use_xflow_sample to True."
)
elif hasattr(expr, '_xflow_sample'):
return expr._xflow_sample(n=n, frac=frac, replace=replace, weights=weights, strata=strata,
random_state=random_state)
return expr.__sample(parts=parts, columns=columns, i=i, n=n, frac=frac, replace=replace,
weights=weights, strata=strata, random_state=random_state)
class RowAppliedCollectionExpr(CollectionExpr):
__slots__ = '_func', '_func_args', '_func_kwargs', '_close_func', \
'_resources', '_raw_inputs', '_lateral_view', '_keep_nulls', \
'_cu_request'
_args = '_input', '_fields', '_collection_resources'
node_name = 'Apply'
def _init(self, *args, **kwargs):
self._init_attr('_raw_inputs', None)
self._init_attr('_lateral_view', False)
self._init_attr('_cu_request', None)
super(RowAppliedCollectionExpr, self)._init(*args, **kwargs)
@property
def input(self):
return self._input
@property
def fields(self):
return self._fields
@property
def input_types(self):
return [f.dtype for f in self._fields]
@property
def raw_input_types(self):
if self._raw_inputs:
return [f.dtype for f in self._raw_inputs]
return self.input_types
@property
def func(self):
return self._func
@func.setter
def func(self, f):
self._func = f
def accept(self, visitor):
return visitor.visit_apply_collection(self)
def _apply_horizontal(expr, func, names=None, types=None, resources=None,
collection_resources=None, keep_nulls=False, cu_request=None,
args=(), **kwargs):
if isinstance(func, FunctionWrapper):
names = names or func.output_names
types = types or func.output_types
func = func._func
if names is not None:
if isinstance(names, list):
names = tuple(names)
elif isinstance(names, six.string_types):
names = (names,)
if names is None:
raise ValueError(
'Apply on rows to provide multiple values should provide all column names, '
'for instance, df.apply(func, axis=1, names=["A", "B"], types=["float", "float"]). '
'See https://pyodps.readthedocs.io/zh_CN/latest/df-sort-distinct-apply.html#dfudtfapp '
'for more information.'
)
tps = (string,) * len(names) if types is None else tuple(validate_data_type(t) for t in types)
schema = TableSchema.from_lists(names, tps)
collection_resources = collection_resources or \
utils.get_collection_resources(resources)
return RowAppliedCollectionExpr(_input=expr, _func=func, _func_args=args,
_func_kwargs=kwargs, _schema=schema,
_fields=[expr[n] for n in expr.schema.names],
_keep_nulls=keep_nulls, _resources=resources,
_collection_resources=collection_resources,
_cu_request=cu_request)
def apply(expr, func, axis=0, names=None, types=None, reduce=False,
resources=None, keep_nulls=False, cu_request=None, args=(), **kwargs):
"""
Apply a function to a row when axis=1 or column when axis=0.
:param expr:
:param func: function to apply
:param axis: row when axis=1 else column
:param names: output names
:param types: output types
:param reduce: if True will return a sequence else return a collection
:param resources: resources to read
:param keep_nulls: if True, keep rows producing empty results, only work in lateral views
:param args: args for function
:param kwargs: kwargs for function
:return:
:Example:
Apply a function to a row:
>>> from odps.df import output
>>>
>>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
>>> def handle(row):
>>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
>>>
>>> iris.apply(handle, axis=1).count()
Apply a function to a column:
>>> class Agg(object):
>>>
>>> def buffer(self):
>>> return [0.0, 0]
>>>
>>> def __call__(self, buffer, val):
>>> buffer[0] += val
>>> buffer[1] += 1
>>>
>>> def merge(self, buffer, pbuffer):
>>> buffer[0] += pbuffer[0]
>>> buffer[1] += pbuffer[1]
>>>
>>> def getvalue(self, buffer):
>>> if buffer[1] == 0:
>>> return 0.0
>>> return buffer[0] / buffer[1]
>>>
>>> iris.exclude('name').apply(Agg)
"""
if types is None and "rtype" in kwargs:
types = kwargs.pop("rtype")
if not isinstance(expr, CollectionExpr):
return
if isinstance(func, FunctionWrapper):
names = names or func.output_names
types = types or func.output_types
func = func._func
if axis == 0:
types = types or expr.schema.types
types = [validate_data_type(t) for t in types]
fields = [expr[n].agg(func, rtype=t, resources=resources)
for n, t in zip(expr.schema.names, types)]
if names:
fields = [f.rename(n) for f, n in zip(fields, names)]
else:
names = [f.name for f in fields]
return Summary(_input=expr, _fields=fields, _schema=TableSchema.from_lists(names, types))
else:
collection_resources = utils.get_collection_resources(resources)
if types is not None:
if isinstance(types, list):
types = tuple(types)
elif isinstance(types, six.string_types):
types = (types,)
types = tuple(validate_data_type(t) for t in types)
if reduce:
from .element import MappedExpr
from ..backends.context import context
if names is not None and len(names) > 1:
raise ValueError('When reduce, at most one name can be specified')
name = names[0] if names is not None else None
if not types and kwargs.get('rtype', None) is not None:
types = [kwargs.pop('rtype')]
tp = types[0] if types is not None else (utils.get_annotation_rtype(func) or string)
if not context.is_cached(expr) and (hasattr(expr, '_fields') and expr._fields is not None):
inputs = [e.copy_tree(stop_cond=lambda x: any(i is expr.input for i in x.children()))
for e in expr._fields]
else:
inputs = [expr[n] for n in expr.schema.names]
return MappedExpr(_func=func, _func_args=args, _func_kwargs=kwargs,
_name=name, _data_type=tp, _inputs=inputs, _multiple=True,
_resources=resources, _cu_request=cu_request,
_collection_resources=collection_resources)
else:
return _apply_horizontal(expr, func, names=names, types=types, resources=resources,
collection_resources=collection_resources, keep_nulls=keep_nulls,
cu_request=cu_request, args=args, **kwargs)
class ReshuffledCollectionExpr(CollectionExpr):
_args = '_input', '_by', '_sort_fields'
node_name = 'Reshuffle'
def _init(self, *args, **kwargs):
from .groupby import BaseGroupBy, SortedGroupBy
self._init_attr('_sort_fields', None)
super(ReshuffledCollectionExpr, self)._init(*args, **kwargs)
if isinstance(self._input, BaseGroupBy):
if isinstance(self._input, SortedGroupBy):
self._sort_fields = self._input._sorted_fields
self._by = self._input._by
self._input = self._input._input
@property
def fields(self):
return self._by + (self._sort_fields or list())
@property
def input(self):
return self._input
def iter_args(self):
arg_names = ['collection', 'bys', 'sort']
for it in zip(arg_names, self.args):
yield it
def accept(self, visitor):
return visitor.visit_reshuffle(self)
def reshuffle(expr, by=None, sort=None, ascending=True):
"""
Reshuffle data.
:param expr:
:param by: the sequence or scalar to shuffle by. RandomScalar as default
:param sort: the sequence or scalar to sort.
:param ascending: True if ascending else False
:return: collection
"""
by = by or RandomScalar()
grouped = expr.groupby(by)
if sort:
grouped = grouped.sort_values(sort, ascending=ascending)
return ReshuffledCollectionExpr(_input=grouped, _schema=expr._schema)
def map_reduce(expr, mapper=None, reducer=None, group=None, sort=None, ascending=True,
combiner=None, combiner_buffer_size=1024,
mapper_output_names=None, mapper_output_types=None, mapper_resources=None, mapper_cu=None,
reducer_output_names=None, reducer_output_types=None, reducer_resources=None, reducer_cu=None):
"""
MapReduce API, mapper or reducer should be provided.
:param expr:
:param mapper: mapper function or class
:param reducer: reducer function or class
:param group: the keys to group after mapper
:param sort: the keys to sort after mapper
:param ascending: True if ascending else False
:param combiner: combiner function or class, combiner's output should be equal to mapper
:param combiner_buffer_size: combiner's buffer size, 1024 as default
:param mapper_output_names: mapper's output names
:param mapper_output_types: mapper's output types
:param mapper_resources: the resources for mapper
:param reducer_output_names: reducer's output names
:param reducer_output_types: reducer's output types
:param reducer_resources: the resources for reducer
:return:
:Example:
>>> from odps.df import output
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(row):
>>> for word in row[0].split():
>>> yield word.lower(), 1
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(keys):
>>> cnt = [0]
>>> def h(row, done): # done illustrates that all the rows of the keys are processed
>>> cnt[0] += row.cnt
>>> if done:
>>> yield keys.word, cnt[0]
>>> return h
>>>
>>> words_df.map_reduce(mapper, reducer, group='word')
"""
def _adjust_partial(fun):
if isinstance(fun, functools.partial) and isinstance(fun.func, FunctionWrapper):
wrapped_fun = fun.func
partial_fun = functools.partial(wrapped_fun._func, *fun.args, **fun.keywords)
ret_fun = FunctionWrapper(partial_fun)
ret_fun.output_names = wrapped_fun.output_names
ret_fun.output_types = wrapped_fun.output_types
return ret_fun
else:
return fun
def conv(l):
if l is None:
return
if isinstance(l, tuple):
l = list(l)
elif not isinstance(l, list):
l = [l, ]
return l
def gen_name():
return 'pyodps_field_%s' % str(uuid.uuid4()).replace('-', '_')
def _gen_actual_reducer(reducer, group):
class ActualReducer(object):
def __init__(self, resources=None):
self._func = reducer
self._curr = None
self._prev_rows = None
self._key_named_tuple = namedtuple('NamedKeys', group)
self._resources = resources
self._f = None
def _is_generator_function(self, f):
if inspect.isgeneratorfunction(f):
return True
elif hasattr(f, '__call__') and inspect.isgeneratorfunction(f.__call__):
return True
return False
def __call__(self, row):
key = tuple(getattr(row, n) for n in group)
k = self._key_named_tuple(*key)
if self._prev_rows is not None:
key_consumed = self._curr != key
if self._is_generator_function(self._f):
for it in self._f(self._prev_rows, key_consumed):
yield it
else:
res = self._f(self._prev_rows, key_consumed)
if res:
yield res
self._prev_rows = row
if self._curr is None or self._curr != key:
self._curr = key
if self._resources and self._f is None:
self._func = self._func(self._resources)
self._f = self._func(k)
def close(self):
if self._prev_rows and self._curr:
if self._is_generator_function(self._f):
for it in self._f(self._prev_rows, True):
yield it
else:
res = self._f(self._prev_rows, True)
if res:
yield res
self._prev_rows = None
return ActualReducer
def _gen_combined_mapper(mapper, combiner, names, group, sort, ascending,
buffer_size, mapper_resources=None):
mapper = mapper if not isinstance(mapper, FunctionWrapper) else mapper._func
sort_indexes = [names.index(s) for s in sort]
if isinstance(ascending, bool):
ascending = [ascending] * len(sort)
class CombinedMapper(object):
def __init__(self, resources=None):
if mapper_resources:
self.f = mapper(resources)
elif inspect.isclass(mapper):
self.f = mapper()
else:
self.f = mapper
self.buffer = list()
if inspect.isfunction(self.f):
self.is_generator = inspect.isgeneratorfunction(self.f)
else:
self.is_generator = inspect.isgeneratorfunction(self.f.__call__)
def _cmp_to_key(self, cmp):
"""Convert a cmp= function into a key= function"""
class K(object):
def __init__(self, obj):
self.obj = obj
def __lt__(self, other):
return cmp(self.obj, other.obj) < 0
def __gt__(self, other):
return cmp(self.obj, other.obj) > 0
def __eq__(self, other):
return cmp(self.obj, other.obj) == 0
def __le__(self, other):
return cmp(self.obj, other.obj) <= 0
def __ge__(self, other):
return cmp(self.obj, other.obj) >= 0
def __ne__(self, other):
return cmp(self.obj, other.obj) != 0
return K
def _combine(self):
def cmp(x, y):
for asc, sort_idx in zip(ascending, sort_indexes):
indict = 1 if asc else -1
if x[sort_idx] > y[sort_idx]:
return indict * 1
elif x[sort_idx] < y[sort_idx]:
return indict * -1
else:
continue
return 0
self.buffer.sort(key=self._cmp_to_key(cmp))
ActualCombiner = _gen_actual_reducer(combiner, group)
ac = ActualCombiner()
named_row = namedtuple('NamedRow', names)
for r in self.buffer:
row = named_row(*r)
for l in ac(row):
yield l
for l in ac.close():
yield l
self.buffer = []
def _handle_output_line(self, line):
if len(self.buffer) >= buffer_size:
for l in self._combine():
yield l
self.buffer.append(line)
def __call__(self, row):
if self.is_generator:
for it in self.f(row):
for l in self._handle_output_line(it):
yield l
else:
for l in self._handle_output_line(self.f(row)):
yield l
def close(self):
if len(self.buffer) > 0:
for l in self._combine():
yield l
return CombinedMapper
mapper = _adjust_partial(mapper)
reducer = _adjust_partial(reducer)
combiner = _adjust_partial(combiner)
if isinstance(mapper, FunctionWrapper):
mapper_output_names = mapper_output_names or mapper.output_names
mapper_output_types = mapper_output_types or mapper.output_types
mapper_output_names = conv(mapper_output_names)
mapper_output_types = conv(mapper_output_types)
if mapper_output_types is not None and mapper_output_names is None:
mapper_output_names = [gen_name() for _ in range(len(mapper_output_types))]
if mapper is None and mapper_output_names is None:
mapper_output_names = expr.schema.names
group = conv(group) or mapper_output_names
sort = sort or tuple()
sort = list(OrderedDict.fromkeys(group + conv(sort)))
if len(sort) > len(group):
ascending = [ascending, ] * (len(sort) - len(group)) \
if isinstance(ascending, bool) else list(ascending)
if len(ascending) != len(sort):
ascending = [True] * len(group) + ascending
if not set(group + sort).issubset(mapper_output_names):
raise ValueError('group and sort have to be the column names of mapper')
if mapper is None:
if mapper_output_names and mapper_output_names != expr.schema.names:
raise ExpressionError(
'Null mapper cannot have mapper output names: %s' % mapper_output_names)
if mapper_output_types and mapper_output_types != expr.schema.types:
raise ExpressionError(
'Null mapper cannot have mapper output types: %s' % mapper_output_types)
mapped = expr
if combiner is not None:
raise ValueError('Combiner is not null when mapper is null')
else:
if combiner is not None:
if isinstance(combiner, FunctionWrapper):
if combiner.output_names and \
combiner.output_names != mapper_output_names:
raise ExpressionError(
'Combiner must have the same output names with mapper')
if combiner.output_types and \
combiner.output_types != mapper_output_types:
raise ExpressionError(
'Combiner must have the same output types with mapper')
combiner = combiner._func
mapper = _gen_combined_mapper(mapper, combiner, mapper_output_names,
group, sort, ascending, combiner_buffer_size,
mapper_resources=mapper_resources)
mapped = expr.apply(mapper, axis=1, names=mapper_output_names,
types=mapper_output_types, resources=mapper_resources,
cu_request=mapper_cu)
clustered = mapped.groupby(group).sort(sort, ascending=ascending)
if isinstance(reducer, FunctionWrapper):
reducer_output_names = reducer_output_names or reducer.output_names
reducer_output_types = reducer_output_types or reducer.output_types
reducer = reducer._func
if reducer is None:
if reducer_output_names and reducer_output_names != mapped.schema.names:
raise ExpressionError(
'Null reducer cannot have reducer output names %s' % reducer_output_names)
if reducer_output_types and reducer_output_types != mapped.schema.types:
raise ExpressionError(
'Null reducer cannot have reducer output types %s' % reducer_output_types)
return mapped
ActualReducer = _gen_actual_reducer(reducer, group)
return clustered.apply(ActualReducer, resources=reducer_resources,
names=reducer_output_names, types=reducer_output_types,
cu_request=reducer_cu)
class PivotCollectionExpr(DynamicCollectionExpr):
_args = '_input', '_group', '_columns', '_values'
node_name = 'Pivot'
def _init(self, *args, **kwargs):
self._init_attr('_group', None)
self._init_attr('_columns', None)
self._init_attr('_values', None)
super(PivotCollectionExpr, self)._init(*args, **kwargs)
if not hasattr(self, '_schema'):
self._schema = DynamicSchema.from_lists(
[f.name for f in self._group], [f.dtype for f in self._group]
)
def iter_args(self):
for it in zip(['collection', 'group', 'columns', 'values'], self.args):
yield it
@property
def input(self):
return self._input
def accept(self, visitor):
return visitor.visit_pivot(self)
def pivot(expr, rows, columns, values=None):
"""
Produce ‘pivot’ table based on 3 columns of this DataFrame.
Uses unique values from rows / columns and fills with values.
:param expr: collection
:param rows: use to make new collection's grouped rows
:param columns: use to make new collection's columns
:param values: values to use for populating new collection's values
:return: collection
:Example:
>>> df.pivot(rows='id', columns='category')
>>> df.pivot(rows='id', columns='category', values='sale')
>>> df.pivot(rows=['id', 'id2'], columns='category', values='sale')
"""
rows = [expr._get_field(r) for r in utils.to_list(rows)]
columns = [expr._get_field(c) for c in utils.to_list(columns)]
if values:
values = utils.to_list(values)
else:
names = set(c.name for c in rows + columns)
values = [n for n in expr.schema.names if n not in names]
if not values:
raise ValueError('No values found for pivot')
values = [expr._get_field(v) for v in values]
if len(columns) > 1:
raise ValueError('More than one `columns` are not supported yet')
return PivotCollectionExpr(_input=expr, _group=rows,
_columns=columns, _values=values)
def melt(expr, id_vars=None, value_vars=None, var_name='variable', value_name='value', ignore_nan=False):
"""
“Unpivots” a DataFrame from wide format to long format, optionally leaving identifier variables set.
This function is useful to massage a DataFrame into a format where one or more columns are identifier
variables (id_vars), while all other columns, considered measured variables (value_vars), are “unpivoted”
to the row axis, leaving just two non-identifier columns, ‘variable’ and ‘value’.
:param expr: collection
:param id_vars: column(s) to use as identifier variables.
:param value_vars: column(s) to unpivot. If not specified, uses all columns that are not set as id_vars.
:param var_name: name to use for the ‘variable’ column. If None it uses frame.columns.name or ‘variable’.
:param value_name: name to use for the ‘value’ column.
:param ignore_nan: whether to ignore NaN values in data.
:return: collection
:Example:
>>> df.melt(id_vars='id', value_vars=['col1', 'col2'])
>>> df.melt(id_vars=['id', 'id2'], value_vars=['col1', 'col2'], var_name='variable')
"""
id_vars = id_vars or []
id_vars = [expr._get_field(r) for r in utils.to_list(id_vars)]
if not value_vars:
id_names = set([c.name for c in id_vars])
value_vars = [expr._get_field(c) for c in expr.schema.names if c not in id_names]
else:
value_vars = [expr._get_field(c) for c in value_vars]
col_type = utils.highest_precedence_data_type(*[c.dtype for c in value_vars])
col_names = [c.name for c in value_vars]
id_names = [r.name for r in id_vars]
names = id_names + [var_name, value_name]
dtypes = [r.dtype for r in id_vars] + [types.string, col_type]
@output(names, dtypes)
def mapper(row):
for cn in col_names:
col_value = getattr(row, cn)
if ignore_nan and col_value is None:
continue
vals = [getattr(row, rn) for rn in id_names]
yield tuple(vals + [cn, col_value])
return expr.map_reduce(mapper)
class PivotTableCollectionExpr(CollectionExpr):
__slots__ = '_agg_func', '_agg_func_names'
_args = '_input', '_group', '_columns', '_values', '_fill_value'
node_name = 'PivotTable'
def _init(self, *args, **kwargs):
for arg in self._args:
self._init_attr(arg, None)
super(PivotTableCollectionExpr, self)._init(*args, **kwargs)
for attr in ('_fill_value', ):
val = getattr(self, attr, None)
if val is not None and not isinstance(val, Scalar):
setattr(self, attr, Scalar(_value=val))
@property
def input(self):
return self._input
@property
def fill_value(self):
if self._fill_value:
return self._fill_value.value
@property
def margins(self):
return self._margins.value
@property
def margins_name(self):
return self._margins_name.value
def accept(self, visitor):
return visitor.visit_pivot(self)
def pivot_table(expr, values=None, rows=None, columns=None, aggfunc='mean',
fill_value=None):
"""
Create a spreadsheet-style pivot table as a DataFrame.
:param expr: collection
:param values (optional): column to aggregate
:param rows: rows to group
:param columns: keys to group by on the pivot table column
:param aggfunc: aggregate function or functions
:param fill_value (optional): value to replace missing value with, default None
:return: collection
:Example:
>>> df
A B C D
0 foo one small 1
1 foo one large 2
2 foo one large 2
3 foo two small 3
4 foo two small 3
5 bar one large 4
6 bar one small 5
7 bar two small 6
8 bar two large 7
>>> table = df.pivot_table(values='D', rows=['A', 'B'], columns='C', aggfunc='sum')
>>> table
A B large_D_sum small_D_sum
0 bar one 4.0 5.0
1 bar two 7.0 6.0
2 foo one 4.0 1.0
3 foo two NaN 6.0
"""
def get_names(iters):
return [r if isinstance(r, six.string_types) else r.name
for r in iters]
def get_aggfunc_name(f):
if isinstance(f, six.string_types):
if '(' in f:
f = re.sub(r' *\( *', '_', f)
f = re.sub(r' *[+\-\*/,] *', '_', f)
f = re.sub(r' *\) *', '', f)
f = f.replace('.', '_')
return f
if isinstance(f, FunctionWrapper):
return f.output_names[0]
return 'aggregation'
if not rows:
raise ValueError('No group keys passed')
rows = utils.to_list(rows)
rows_names = get_names(rows)
rows = [expr._get_field(r) for r in rows]
if isinstance(aggfunc, dict):
agg_func_names = lkeys(aggfunc)
aggfunc = lvalues(aggfunc)
else:
aggfunc = utils.to_list(aggfunc)
agg_func_names = [get_aggfunc_name(af) for af in aggfunc]
if not columns:
if values is None:
values = [n for n in expr.schema.names if n not in rows_names]
else:
values = utils.to_list(values)
values = [expr._get_field(v) for v in values]
names = rows_names
types = [r.dtype for r in rows]
for func, func_name in zip(aggfunc, agg_func_names):
for value in values:
if isinstance(func, six.string_types):
seq = value.eval(func, rewrite=False)
if isinstance(seq, ReprWrapper):
seq = seq()
else:
seq = value.agg(func)
seq = seq.rename('{0}_{1}'.format(value.name, func_name))
names.append(seq.name)
types.append(seq.dtype)
schema = TableSchema.from_lists(names, types)
return PivotTableCollectionExpr(_input=expr, _group=rows, _values=values,
_fill_value=fill_value, _schema=schema,
_agg_func=aggfunc, _agg_func_names=agg_func_names)
else:
columns = [expr._get_field(c) for c in utils.to_list(columns)]
if values:
values = utils.to_list(values)
else:
names = set(c.name for c in rows + columns)
values = [n for n in expr.schema.names if n not in names]
if not values:
raise ValueError('No values found for pivot_table')
values = [expr._get_field(v) for v in values]
if len(columns) > 1:
raise ValueError('More than one `columns` are not supported yet')
schema = DynamicSchema.from_lists(rows_names, [r.dtype for r in rows])
base_tp = PivotTableCollectionExpr
tp = type(base_tp.__name__, (DynamicCollectionExpr, base_tp), dict())
return tp(_input=expr, _group=rows, _values=values,
_columns=columns, _agg_func=aggfunc,
_fill_value=fill_value, _schema=schema,
_agg_func_names=agg_func_names)
def _scale_values(expr, columns, agg_fun, scale_fun, preserve=False, suffix='_scaled', group=None):
from ..types import Float, Integer
time_suffix = str(int(time.time()))
if group is not None:
group = utils.to_list(group)
group = [expr._get_field(c).name if isinstance(c, Column) else c for c in group]
if columns is None:
if group is None:
columns = expr.schema.names
else:
columns = [n for n in expr.schema.names if n not in group]
else:
columns = utils.to_list(columns)
columns = [expr._get_field(v) for v in columns]
numerical_cols = [col.name for col in columns if isinstance(col.data_type, (Float, Integer))]
agg_cols = []
for col_name in numerical_cols:
agg_cols.extend(agg_fun(expr, col_name))
if group is None:
# make a fake constant column to join
extra_col = 'idx_col_' + time_suffix
join_cols = [extra_col]
stats_df = expr.__getitem__([Scalar(1).rename(extra_col)] + agg_cols)
mapped = expr[expr, Scalar(1).rename(extra_col)]
else:
extra_col = None
join_cols = group
stats_df = expr.groupby(join_cols).agg(*agg_cols)
mapped = expr
joined = mapped.join(stats_df, on=join_cols, mapjoin=True)
if extra_col is not None:
joined = joined.exclude(extra_col)
if preserve:
norm_cols = [dt.name for dt in expr.dtypes]
norm_cols.extend([scale_fun(joined, dt.name).rename(dt.name + suffix)
for dt in expr.dtypes if dt.name in numerical_cols])
else:
norm_cols = [scale_fun(joined, dt.name).rename(dt.name)
if dt.name in numerical_cols else getattr(joined, dt.name)
for dt in expr.dtypes]
return joined.__getitem__(norm_cols)
def min_max_scale(expr, columns=None, feature_range=(0, 1), preserve=False, suffix='_scaled', group=None):
"""
Resize a data frame by max / min values, i.e., (X - min(X)) / (max(X) - min(X))
:param DataFrame expr: input DataFrame
:param feature_range: the target range to resize the value into, i.e., v * (b - a) + a
:param bool preserve: determine whether input data should be kept. If True, scaled input data will be appended to the data frame with `suffix`
:param columns: columns names to resize. If set to None, float or int-typed columns will be normalized if the column is not specified as a group column.
:param group: determine scale groups. Scaling will be done in each group separately.
:param str suffix: column suffix to be appended to the scaled columns.
:return: resized data frame
:rtype: DataFrame
"""
time_suffix = str(int(time.time()))
def calc_agg(expr, col):
return [
getattr(expr, col).min().rename(col + '_min_' + time_suffix),
getattr(expr, col).max().rename(col + '_max_' + time_suffix),
]
def do_scale(expr, col):
f_min, f_max = feature_range
r = getattr(expr, col + '_max_' + time_suffix) - getattr(expr, col + '_min_' + time_suffix)
scaled = (r == 0).ifelse(Scalar(0), (getattr(expr, col) - getattr(expr, col + '_min_' + time_suffix)) / r)
return scaled * (f_max - f_min) + f_min
return _scale_values(expr, columns, calc_agg, do_scale, preserve=preserve, suffix=suffix, group=group)
def std_scale(expr, columns=None, with_means=True, with_std=True, preserve=False, suffix='_scaled', group=None):
"""
Resize a data frame by mean and standard error.
:param DataFrame expr: Input DataFrame
:param bool with_means: Determine whether the output will be subtracted by means
:param bool with_std: Determine whether the output will be divided by standard deviations
:param bool preserve: Determine whether input data should be kept. If True, scaled input data will be appended to the data frame with `suffix`
:param columns: Columns names to resize. If set to None, float or int-typed columns will be normalized if the column is not specified as a group column.
:param group: determine scale groups. Scaling will be done in each group separately.
:param str suffix: column suffix to be appended to the scaled columns.
:return: resized data frame
:rtype: DataFrame
"""
time_suffix = str(int(time.time()))
def calc_agg(expr, col):
return [
getattr(expr, col).mean().rename(col + '_mean_' + time_suffix),
getattr(expr, col).std(ddof=0).rename(col + '_std_' + time_suffix),
]
def do_scale(expr, col):
c = getattr(expr, col)
mean_expr = getattr(expr, col + '_mean_' + time_suffix)
if with_means:
c = c - mean_expr
mean_expr = Scalar(0)
if with_std:
std_expr = getattr(expr, col + '_std_' + time_suffix)
c = (std_expr == 0).ifelse(mean_expr, c / getattr(expr, col + '_std_' + time_suffix))
return c
return _scale_values(expr, columns, calc_agg, do_scale, preserve=preserve, suffix=suffix, group=group)
class ExtractKVCollectionExpr(DynamicCollectionExpr):
__slots__ = '_column_type',
_args = '_input', '_columns', '_intact', '_kv_delimiter', '_item_delimiter', '_default'
node_name = 'ExtractKV'
def _init(self, *args, **kwargs):
from .element import _scalar
for attr in self._args[1:]:
self._init_attr(attr, None)
super(ExtractKVCollectionExpr, self)._init(*args, **kwargs)
self._kv_delimiter = _scalar(self._kv_delimiter)
self._item_delimiter = _scalar(self._item_delimiter)
self._default = _scalar(self._default)
@property
def input(self):
return self._input
def accept(self, visitor):
visitor.visit_extract_kv(self)
def extract_kv(expr, columns=None, kv_delim=':', item_delim=',', dtype='float', fill_value=None):
"""
Extract values in key-value represented columns into standalone columns. New column names will
be the name of the key-value column followed by an underscore and the key.
:param DataFrame expr: input DataFrame
:param columns: the key-value columns to be extracted.
:param str kv_delim: delimiter between key and value.
:param str item_delim: delimiter between key-value pairs.
:param str dtype: type of value columns to generate.
:param fill_value: default value for missing key-value pairs.
:return: extracted data frame
:rtype: DataFrame
:Example:
>>> df
name kv
0 name1 k1=1.0,k2=3.0,k5=10.0
1 name2 k2=3.0,k3=5.1
2 name3 k1=7.1,k7=8.2
3 name4 k2=1.2,k3=1.5
4 name5 k2=1.0,k9=1.1
>>> table = df.extract_kv(columns=['A', 'B'], kv_delim='=')
>>> table
name kv_k1 kv_k2 kv_k3 kv_k5 kv_k7 kv_k9
0 name1 1.0 3.0 Nan 10.0 Nan Nan
1 name2 Nan 3.0 5.1 Nan Nan Nan
2 name3 7.1 Nan Nan Nan 8.2 Nan
3 name4 Nan 1.2 1.5 Nan Nan Nan
4 name5 Nan 1.0 Nan Nan Nan 1.1
"""
if columns is None:
columns = [expr._get_field(c) for c in expr.schema.names]
intact_cols = []
else:
columns = [expr._get_field(c) for c in utils.to_list(columns)]
name_set = set([c.name for c in columns])
intact_cols = [expr._get_field(c) for c in expr.schema.names if c not in name_set]
column_type = types.validate_data_type(dtype)
if any(not isinstance(c.dtype, types.String) for c in columns):
raise ExpressionError('Key-value columns must be strings.')
schema = DynamicSchema.from_lists([c.name for c in intact_cols], [c.dtype for c in intact_cols])
return ExtractKVCollectionExpr(_input=expr, _columns=columns, _intact=intact_cols, _schema=schema,
_column_type=column_type, _default=fill_value,
_kv_delimiter=kv_delim, _item_delimiter=item_delim)
def to_kv(expr, columns=None, kv_delim=':', item_delim=',', kv_name='kv_col'):
"""
Merge values in specified columns into a key-value represented column.
:param DataFrame expr: input DataFrame
:param columns: the columns to be merged.
:param str kv_delim: delimiter between key and value.
:param str item_delim: delimiter between key-value pairs.
:param str kv_col: name of the new key-value column
:return: converted data frame
:rtype: DataFrame
:Example:
>>> df
name k1 k2 k3 k5 k7 k9
0 name1 1.0 3.0 Nan 10.0 Nan Nan
1 name2 Nan 3.0 5.1 Nan Nan Nan
2 name3 7.1 Nan Nan Nan 8.2 Nan
3 name4 Nan 1.2 1.5 Nan Nan Nan
4 name5 Nan 1.0 Nan Nan Nan 1.1
>>> table = df.to_kv(columns=['A', 'B'], kv_delim='=')
>>> table
name kv_col
0 name1 k1=1.0,k2=3.0,k5=10.0
1 name2 k2=3.0,k3=5.1
2 name3 k1=7.1,k7=8.2
3 name4 k2=1.2,k3=1.5
4 name5 k2=1.0,k9=1.1
"""
if columns is None:
columns = [expr._get_field(c) for c in expr.schema.names]
intact_cols = []
else:
columns = [expr._get_field(c) for c in utils.to_list(columns)]
name_set = set([c.name for c in columns])
intact_cols = [expr._get_field(c) for c in expr.schema.names if c not in name_set]
mapped_cols = [c.isnull().ifelse(Scalar(''), c.name + kv_delim + c.astype('string')) for c in columns]
reduced_col = reduce(lambda a, b: (b == '').ifelse(a, (a == '').ifelse(b, a + item_delim + b)), mapped_cols)
return expr.__getitem__(intact_cols + [reduced_col.rename(kv_name)])
def dropna(expr, how='any', thresh=None, subset=None):
"""
Return object with labels on given axis omitted where alternately any or all of the data are missing
:param DataFrame expr: input DataFrame
:param how: can be ‘any’ or ‘all’. If 'any' is specified any NA values are present, drop that label. If 'all' is specified and all values are NA, drop that label.
:param thresh: require that many non-NA values
:param subset: Labels along other axis to consider, e.g. if you are dropping rows these would be a list of columns to include
:return: DataFrame
"""
if subset is None:
subset = [expr._get_field(c) for c in expr.schema.names]
else:
subset = [expr._get_field(c) for c in utils.to_list(subset)]
if not subset:
raise ValueError('Illegal subset is provided.')
if thresh is None:
thresh = len(subset) if how == 'any' else 1
sum_exprs = reduce(operator.add, (s.notna().ifelse(1, 0) for s in subset))
return expr.filter(sum_exprs >= thresh)
def fillna(expr, value=None, method=None, subset=None):
"""
Fill NA/NaN values using the specified method
:param DataFrame expr: input DataFrame
:param method: can be ‘backfill’, ‘bfill’, ‘pad’, ‘ffill’ or None
:param value: value to fill into
:param subset: Labels along other axis to consider.
:return: DataFrame
"""
col_dict = OrderedDict([(c, expr._get_field(c)) for c in expr.schema.names])
if subset is None:
sel_col_names = expr.schema.names
else:
# when c is in expr._fields, _get_field may do substitution which will cause error
subset = (c.copy() if isinstance(c, Expr) else c for c in utils.to_list(subset))
sel_col_names = [expr._get_field(c).name for c in subset]
if method is not None and value is not None:
raise ValueError('The argument `method` is not compatible with `value`.')
if method is None and value is None:
raise ValueError('You should supply at least one argument in `method` and `value`.')
if method is not None and method not in ('backfill', 'bfill', 'pad', 'ffill'):
raise ValueError('Method value %s is illegal.' % str(method))
if method in ('backfill', 'bfill'):
sel_cols = list(reversed(sel_col_names))
else:
sel_cols = sel_col_names
if method is None:
for n in sel_col_names:
e = col_dict[n]
col_dict[n] = e.isna().ifelse(value, e).rename(n)
return expr.select(list(col_dict.values()))
else:
names = list(col_dict.keys())
typs = list(c.dtype.name for c in col_dict.values())
@output(names, typs)
def mapper(row):
last_valid = None
update_dict = dict()
import math
try:
import numpy as np
except ImportError:
np = None
def isnan(v):
if v is None:
return True
if np is not None:
try:
return np.isnan(v)
except TypeError:
pass
try:
return math.isnan(v)
except TypeError:
return False
for n in sel_cols:
old_val = getattr(row, n)
if old_val is None or isnan(old_val):
if last_valid is not None:
update_dict[n] = last_valid
else:
last_valid = old_val
yield row.replace(**update_dict)
return expr.map_reduce(mapper)
def ffill(expr, subset=None):
"""
Fill NA/NaN values with the forward method. Equivalent to fillna(method='ffill').
:param DataFrame expr: input DataFrame.
:param subset: Labels along other axis to consider.
:return: DataFrame
"""
return expr.fillna(method='ffill', subset=subset)
def bfill(expr, subset=None):
"""
Fill NA/NaN values with the backward method. Equivalent to fillna(method='bfill').
:param DataFrame expr: input DataFrame.
:param subset: Labels along other axis to consider.
:return: DataFrame
"""
return expr.fillna(method='bfill', subset=subset)
class AppendIDCollectionExpr(CollectionExpr):
_args = '_input', '_id_col'
node_name = 'AppendID'
def _init(self, *args, **kwargs):
from .element import _scalar
for attr in self._args[1:]:
self._init_attr(attr, None)
super(AppendIDCollectionExpr, self)._init(*args, **kwargs)
self._validate()
self._id_col = _scalar(self._id_col)
self._schema = TableSchema.from_lists(
self._input.schema.names + [self._id_col.value],
self._input.schema.types + [types.int64],
)
def _validate(self):
if self._id_col in self._input.schema:
raise ExpressionError('ID column already exists in current data frame.')
@property
def input(self):
return self._input
def accept(self, visitor):
return visitor.visit_append_id(self)
def _append_id(expr, id_col='append_id'):
return AppendIDCollectionExpr(_input=expr, _id_col=id_col)
def append_id(expr, id_col='append_id'):
"""
Append an ID column to current column to form a new DataFrame.
:param str id_col: name of appended ID field.
:return: DataFrame with ID field
:rtype: DataFrame
"""
if hasattr(expr, '_xflow_append_id'):
return expr._xflow_append_id(id_col)
else:
return _append_id(expr, id_col)
class SplitCollectionExpr(CollectionExpr):
_args = '_input', '_frac', '_seed', '_split_id'
node_name = 'Split'
def _init(self, *args, **kwargs):
from .element import _scalar
for attr in self._args[1:]:
self._init_attr(attr, None)
super(SplitCollectionExpr, self)._init(*args, **kwargs)
self._frac = _scalar(self._frac)
self._seed = _scalar(self._seed, types.int32)
self._split_id = _scalar(self._split_id, types.int32)
self._schema = self._input.schema
@property
def input(self):
return self._input
def accept(self, visitor):
return visitor.visit_split(self)
def _split(expr, frac, seed=None):
seed = seed or int(time.time())
return (
SplitCollectionExpr(_input=expr, _frac=frac, _seed=seed, _split_id=0),
SplitCollectionExpr(_input=expr, _frac=frac, _seed=seed, _split_id=1),
)
def split(expr, frac, seed=None):
"""
Split the current column into two column objects with certain ratio.
:param float frac: Split ratio
:return: two split DataFrame objects
"""
if hasattr(expr, '_xflow_split'):
return expr._xflow_split(frac, seed=seed)
else:
return _split(expr, frac, seed=seed)
def applymap(expr, func, rtype=None, resources=None, columns=None, excludes=None, args=(), **kwargs):
"""
Call func on each element of this collection.
:param func: lambda, function, :class:`odps.models.Function`,
or str which is the name of :class:`odps.models.Funtion`
:param rtype: if not provided, will be the dtype of this sequence
:param columns: columns to apply this function on
:param excludes: columns to skip when applying the function
:return: a new collection
:Example:
>>> df.applymap(lambda x: x + 1)
"""
if columns is not None and excludes is not None:
raise ValueError('`columns` and `excludes` cannot be provided at the same time.')
if not columns:
excludes = excludes or []
if isinstance(excludes, six.string_types):
excludes = [excludes]
excludes = set([c if isinstance(c, six.string_types) else c.name for c in excludes])
columns = set([c for c in expr.schema.names if c not in excludes])
else:
if isinstance(columns, six.string_types):
columns = [columns]
columns = set([c if isinstance(c, six.string_types) else c.name for c in columns])
mapping = [expr[c] if c not in columns
else expr[c].map(func, rtype=rtype, resources=resources, args=args, **kwargs)
for c in expr.schema.names]
return expr.select(*mapping)
_collection_methods = dict(
sort_values=sort_values,
sort=sort_values,
distinct=distinct,
apply=apply,
reshuffle=reshuffle,
map_reduce=map_reduce,
sample=sample,
__sample=__df_sample,
pivot=pivot,
melt=melt,
pivot_table=pivot_table,
extract_kv=extract_kv,
to_kv=to_kv,
dropna=dropna,
fillna=fillna,
ffill=ffill,
bfill=bfill,
min_max_scale=min_max_scale,
std_scale=std_scale,
_append_id=_append_id,
append_id=append_id,
_split=_split,
split=split,
applymap=applymap,
)
_sequence_methods = dict(
unique=unique
)
utils.add_method(CollectionExpr, _collection_methods)
utils.add_method(SequenceExpr, _sequence_methods)