in odps/df/expr/collections.py [0:0]
def apply(expr, func, axis=0, names=None, types=None, reduce=False,
resources=None, keep_nulls=False, cu_request=None, args=(), **kwargs):
"""
Apply a function to a row when axis=1 or column when axis=0.
:param expr:
:param func: function to apply
:param axis: row when axis=1 else column
:param names: output names
:param types: output types
:param reduce: if True will return a sequence else return a collection
:param resources: resources to read
:param keep_nulls: if True, keep rows producing empty results, only work in lateral views
:param args: args for function
:param kwargs: kwargs for function
:return:
:Example:
Apply a function to a row:
>>> from odps.df import output
>>>
>>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
>>> def handle(row):
>>> yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
>>> yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
>>>
>>> iris.apply(handle, axis=1).count()
Apply a function to a column:
>>> class Agg(object):
>>>
>>> def buffer(self):
>>> return [0.0, 0]
>>>
>>> def __call__(self, buffer, val):
>>> buffer[0] += val
>>> buffer[1] += 1
>>>
>>> def merge(self, buffer, pbuffer):
>>> buffer[0] += pbuffer[0]
>>> buffer[1] += pbuffer[1]
>>>
>>> def getvalue(self, buffer):
>>> if buffer[1] == 0:
>>> return 0.0
>>> return buffer[0] / buffer[1]
>>>
>>> iris.exclude('name').apply(Agg)
"""
if types is None and "rtype" in kwargs:
types = kwargs.pop("rtype")
if not isinstance(expr, CollectionExpr):
return
if isinstance(func, FunctionWrapper):
names = names or func.output_names
types = types or func.output_types
func = func._func
if axis == 0:
types = types or expr.schema.types
types = [validate_data_type(t) for t in types]
fields = [expr[n].agg(func, rtype=t, resources=resources)
for n, t in zip(expr.schema.names, types)]
if names:
fields = [f.rename(n) for f, n in zip(fields, names)]
else:
names = [f.name for f in fields]
return Summary(_input=expr, _fields=fields, _schema=TableSchema.from_lists(names, types))
else:
collection_resources = utils.get_collection_resources(resources)
if types is not None:
if isinstance(types, list):
types = tuple(types)
elif isinstance(types, six.string_types):
types = (types,)
types = tuple(validate_data_type(t) for t in types)
if reduce:
from .element import MappedExpr
from ..backends.context import context
if names is not None and len(names) > 1:
raise ValueError('When reduce, at most one name can be specified')
name = names[0] if names is not None else None
if not types and kwargs.get('rtype', None) is not None:
types = [kwargs.pop('rtype')]
tp = types[0] if types is not None else (utils.get_annotation_rtype(func) or string)
if not context.is_cached(expr) and (hasattr(expr, '_fields') and expr._fields is not None):
inputs = [e.copy_tree(stop_cond=lambda x: any(i is expr.input for i in x.children()))
for e in expr._fields]
else:
inputs = [expr[n] for n in expr.schema.names]
return MappedExpr(_func=func, _func_args=args, _func_kwargs=kwargs,
_name=name, _data_type=tp, _inputs=inputs, _multiple=True,
_resources=resources, _cu_request=cu_request,
_collection_resources=collection_resources)
else:
return _apply_horizontal(expr, func, names=names, types=types, resources=resources,
collection_resources=collection_resources, keep_nulls=keep_nulls,
cu_request=cu_request, args=args, **kwargs)