def map_reduce()

in odps/df/expr/collections.py [0:0]


def map_reduce(expr, mapper=None, reducer=None, group=None, sort=None, ascending=True,
               combiner=None, combiner_buffer_size=1024,
               mapper_output_names=None, mapper_output_types=None, mapper_resources=None, mapper_cu=None,
               reducer_output_names=None, reducer_output_types=None, reducer_resources=None, reducer_cu=None):
    """
    MapReduce API, mapper or reducer should be provided.

    :param expr:
    :param mapper: mapper function or class
    :param reducer: reducer function or class
    :param group: the keys to group after mapper
    :param sort: the keys to sort after mapper
    :param ascending: True if ascending else False
    :param combiner: combiner function or class, combiner's output should be equal to mapper
    :param combiner_buffer_size: combiner's buffer size, 1024 as default
    :param mapper_output_names: mapper's output names
    :param mapper_output_types: mapper's output types
    :param mapper_resources: the resources for mapper
    :param reducer_output_names: reducer's output names
    :param reducer_output_types: reducer's output types
    :param reducer_resources: the resources for reducer
    :return:

    :Example:

    >>> from odps.df import output
    >>>
    >>> @output(['word', 'cnt'], ['string', 'int'])
    >>> def mapper(row):
    >>>     for word in row[0].split():
    >>>         yield word.lower(), 1
    >>>
    >>> @output(['word', 'cnt'], ['string', 'int'])
    >>> def reducer(keys):
    >>>     cnt = [0]
    >>>     def h(row, done):  # done illustrates that all the rows of the keys are processed
    >>>         cnt[0] += row.cnt
    >>>         if done:
    >>>             yield keys.word, cnt[0]
    >>>     return h
    >>>
    >>> words_df.map_reduce(mapper, reducer, group='word')
    """

    def _adjust_partial(fun):
        if isinstance(fun, functools.partial) and isinstance(fun.func, FunctionWrapper):
            wrapped_fun = fun.func
            partial_fun = functools.partial(wrapped_fun._func, *fun.args, **fun.keywords)
            ret_fun = FunctionWrapper(partial_fun)
            ret_fun.output_names = wrapped_fun.output_names
            ret_fun.output_types = wrapped_fun.output_types
            return ret_fun
        else:
            return fun

    def conv(l):
        if l is None:
            return
        if isinstance(l, tuple):
            l = list(l)
        elif not isinstance(l, list):
            l = [l, ]

        return l

    def gen_name():
        return 'pyodps_field_%s' % str(uuid.uuid4()).replace('-', '_')

    def _gen_actual_reducer(reducer, group):
        class ActualReducer(object):
            def __init__(self, resources=None):
                self._func = reducer
                self._curr = None
                self._prev_rows = None
                self._key_named_tuple = namedtuple('NamedKeys', group)

                self._resources = resources
                self._f = None

            def _is_generator_function(self, f):
                if inspect.isgeneratorfunction(f):
                    return True
                elif hasattr(f, '__call__') and inspect.isgeneratorfunction(f.__call__):
                    return True
                return False

            def __call__(self, row):
                key = tuple(getattr(row, n) for n in group)
                k = self._key_named_tuple(*key)

                if self._prev_rows is not None:
                    key_consumed = self._curr != key
                    if self._is_generator_function(self._f):
                        for it in self._f(self._prev_rows, key_consumed):
                            yield it
                    else:
                        res = self._f(self._prev_rows, key_consumed)
                        if res:
                            yield res

                self._prev_rows = row

                if self._curr is None or self._curr != key:
                    self._curr = key
                    if self._resources and self._f is None:
                        self._func = self._func(self._resources)
                    self._f = self._func(k)

            def close(self):
                if self._prev_rows and self._curr:
                    if self._is_generator_function(self._f):
                        for it in self._f(self._prev_rows, True):
                            yield it
                    else:
                        res = self._f(self._prev_rows, True)
                        if res:
                            yield res
                self._prev_rows = None

        return ActualReducer

    def _gen_combined_mapper(mapper, combiner, names, group, sort, ascending,
                             buffer_size, mapper_resources=None):
        mapper = mapper if not isinstance(mapper, FunctionWrapper) else mapper._func
        sort_indexes = [names.index(s) for s in sort]
        if isinstance(ascending, bool):
            ascending = [ascending] * len(sort)

        class CombinedMapper(object):
            def __init__(self, resources=None):
                if mapper_resources:
                    self.f = mapper(resources)
                elif inspect.isclass(mapper):
                    self.f = mapper()
                else:
                    self.f = mapper
                self.buffer = list()
                if inspect.isfunction(self.f):
                    self.is_generator = inspect.isgeneratorfunction(self.f)
                else:
                    self.is_generator = inspect.isgeneratorfunction(self.f.__call__)

            def _cmp_to_key(self, cmp):
                """Convert a cmp= function into a key= function"""

                class K(object):
                    def __init__(self, obj):
                        self.obj = obj

                    def __lt__(self, other):
                        return cmp(self.obj, other.obj) < 0

                    def __gt__(self, other):
                        return cmp(self.obj, other.obj) > 0

                    def __eq__(self, other):
                        return cmp(self.obj, other.obj) == 0

                    def __le__(self, other):
                        return cmp(self.obj, other.obj) <= 0

                    def __ge__(self, other):
                        return cmp(self.obj, other.obj) >= 0

                    def __ne__(self, other):
                        return cmp(self.obj, other.obj) != 0

                return K

            def _combine(self):
                def cmp(x, y):
                    for asc, sort_idx in zip(ascending, sort_indexes):
                        indict = 1 if asc else -1
                        if x[sort_idx] > y[sort_idx]:
                            return indict * 1
                        elif x[sort_idx] < y[sort_idx]:
                            return indict * -1
                        else:
                            continue

                    return 0

                self.buffer.sort(key=self._cmp_to_key(cmp))

                ActualCombiner = _gen_actual_reducer(combiner, group)
                ac = ActualCombiner()
                named_row = namedtuple('NamedRow', names)
                for r in self.buffer:
                    row = named_row(*r)
                    for l in ac(row):
                        yield l
                for l in ac.close():
                    yield l

                self.buffer = []

            def _handle_output_line(self, line):
                if len(self.buffer) >= buffer_size:
                    for l in self._combine():
                        yield l

                self.buffer.append(line)

            def __call__(self, row):
                if self.is_generator:
                    for it in self.f(row):
                        for l in self._handle_output_line(it):
                            yield l
                else:
                    for l in self._handle_output_line(self.f(row)):
                        yield l

            def close(self):
                if len(self.buffer) > 0:
                    for l in self._combine():
                        yield l

        return CombinedMapper

    mapper = _adjust_partial(mapper)
    reducer = _adjust_partial(reducer)
    combiner = _adjust_partial(combiner)

    if isinstance(mapper, FunctionWrapper):
        mapper_output_names = mapper_output_names or mapper.output_names
        mapper_output_types = mapper_output_types or mapper.output_types

    mapper_output_names = conv(mapper_output_names)
    mapper_output_types = conv(mapper_output_types)

    if mapper_output_types is not None and mapper_output_names is None:
        mapper_output_names = [gen_name() for _ in range(len(mapper_output_types))]

    if mapper is None and mapper_output_names is None:
        mapper_output_names = expr.schema.names

    group = conv(group) or mapper_output_names
    sort = sort or tuple()
    sort = list(OrderedDict.fromkeys(group + conv(sort)))
    if len(sort) > len(group):
        ascending = [ascending, ] * (len(sort) - len(group)) \
            if isinstance(ascending, bool) else list(ascending)
        if len(ascending) != len(sort):
            ascending = [True] * len(group) + ascending

    if not set(group + sort).issubset(mapper_output_names):
        raise ValueError('group and sort have to be the column names of mapper')

    if mapper is None:
        if mapper_output_names and mapper_output_names != expr.schema.names:
            raise ExpressionError(
                'Null mapper cannot have mapper output names: %s' % mapper_output_names)
        if mapper_output_types and mapper_output_types != expr.schema.types:
            raise ExpressionError(
                'Null mapper cannot have mapper output types: %s' % mapper_output_types)
        mapped = expr
        if combiner is not None:
            raise ValueError('Combiner is not null when mapper is null')
    else:
        if combiner is not None:
            if isinstance(combiner, FunctionWrapper):
                if combiner.output_names and \
                        combiner.output_names != mapper_output_names:
                    raise ExpressionError(
                        'Combiner must have the same output names with mapper')
                if combiner.output_types and \
                        combiner.output_types != mapper_output_types:
                    raise ExpressionError(
                        'Combiner must have the same output types with mapper')
                combiner = combiner._func
            mapper = _gen_combined_mapper(mapper, combiner, mapper_output_names,
                                          group, sort, ascending, combiner_buffer_size,
                                          mapper_resources=mapper_resources)
        mapped = expr.apply(mapper, axis=1, names=mapper_output_names,
                            types=mapper_output_types, resources=mapper_resources,
                            cu_request=mapper_cu)

    clustered = mapped.groupby(group).sort(sort, ascending=ascending)

    if isinstance(reducer, FunctionWrapper):
        reducer_output_names = reducer_output_names or reducer.output_names
        reducer_output_types = reducer_output_types or reducer.output_types
        reducer = reducer._func

    if reducer is None:
        if reducer_output_names and reducer_output_names != mapped.schema.names:
            raise ExpressionError(
                'Null reducer cannot have reducer output names %s' % reducer_output_names)
        if reducer_output_types and reducer_output_types != mapped.schema.types:
            raise ExpressionError(
                'Null reducer cannot have reducer output types %s' % reducer_output_types)
        return mapped

    ActualReducer = _gen_actual_reducer(reducer, group)
    return clustered.apply(ActualReducer, resources=reducer_resources,
                           names=reducer_output_names, types=reducer_output_types,
                           cu_request=reducer_cu)