def apply()

in odps/df/expr/collections.py [0:0]


def apply(expr, func, axis=0, names=None, types=None, reduce=False,
          resources=None, keep_nulls=False, cu_request=None, args=(), **kwargs):
    """
    Apply a function to a row when axis=1 or column when axis=0.

    :param expr:
    :param func: function to apply
    :param axis: row when axis=1 else column
    :param names: output names
    :param types: output types
    :param reduce: if True will return a sequence else return a collection
    :param resources: resources to read
    :param keep_nulls: if True, keep rows producing empty results, only work in lateral views
    :param args: args for function
    :param kwargs: kwargs for function
    :return:

    :Example:

    Apply a function to a row:

    >>> from odps.df import output
    >>>
    >>> @output(['iris_add', 'iris_sub'], ['float', 'float'])
    >>> def handle(row):
    >>>     yield row.sepallength - row.sepalwidth, row.sepallength + row.sepalwidth
    >>>     yield row.petallength - row.petalwidth, row.petallength + row.petalwidth
    >>>
    >>> iris.apply(handle, axis=1).count()


    Apply a function to a column:

    >>> class Agg(object):
    >>>
    >>>     def buffer(self):
    >>>         return [0.0, 0]
    >>>
    >>>     def __call__(self, buffer, val):
    >>>         buffer[0] += val
    >>>         buffer[1] += 1
    >>>
    >>>     def merge(self, buffer, pbuffer):
    >>>         buffer[0] += pbuffer[0]
    >>>         buffer[1] += pbuffer[1]
    >>>
    >>>     def getvalue(self, buffer):
    >>>         if buffer[1] == 0:
    >>>             return 0.0
    >>>         return buffer[0] / buffer[1]
    >>>
    >>> iris.exclude('name').apply(Agg)
    """

    if types is None and "rtype" in kwargs:
        types = kwargs.pop("rtype")

    if not isinstance(expr, CollectionExpr):
        return

    if isinstance(func, FunctionWrapper):
        names = names or func.output_names
        types = types or func.output_types
        func = func._func

    if axis == 0:
        types = types or expr.schema.types
        types = [validate_data_type(t) for t in types]

        fields = [expr[n].agg(func, rtype=t, resources=resources)
                  for n, t in zip(expr.schema.names, types)]
        if names:
            fields = [f.rename(n) for f, n in zip(fields, names)]
        else:
            names = [f.name for f in fields]
        return Summary(_input=expr, _fields=fields, _schema=TableSchema.from_lists(names, types))
    else:
        collection_resources = utils.get_collection_resources(resources)

        if types is not None:
            if isinstance(types, list):
                types = tuple(types)
            elif isinstance(types, six.string_types):
                types = (types,)

            types = tuple(validate_data_type(t) for t in types)
        if reduce:
            from .element import MappedExpr
            from ..backends.context import context

            if names is not None and len(names) > 1:
                raise ValueError('When reduce, at most one name can be specified')
            name = names[0] if names is not None else None
            if not types and kwargs.get('rtype', None) is not None:
                types = [kwargs.pop('rtype')]
            tp = types[0] if types is not None else (utils.get_annotation_rtype(func) or string)
            if not context.is_cached(expr) and (hasattr(expr, '_fields') and expr._fields is not None):
                inputs = [e.copy_tree(stop_cond=lambda x: any(i is expr.input for i in x.children()))
                          for e in expr._fields]
            else:
                inputs = [expr[n] for n in expr.schema.names]
            return MappedExpr(_func=func, _func_args=args, _func_kwargs=kwargs,
                              _name=name, _data_type=tp, _inputs=inputs, _multiple=True,
                              _resources=resources, _cu_request=cu_request,
                              _collection_resources=collection_resources)
        else:
            return _apply_horizontal(expr, func, names=names, types=types, resources=resources,
                                     collection_resources=collection_resources, keep_nulls=keep_nulls,
                                     cu_request=cu_request, args=args, **kwargs)