in odps/df/expr/collections.py [0:0]
def map_reduce(expr, mapper=None, reducer=None, group=None, sort=None, ascending=True,
combiner=None, combiner_buffer_size=1024,
mapper_output_names=None, mapper_output_types=None, mapper_resources=None, mapper_cu=None,
reducer_output_names=None, reducer_output_types=None, reducer_resources=None, reducer_cu=None):
"""
MapReduce API, mapper or reducer should be provided.
:param expr:
:param mapper: mapper function or class
:param reducer: reducer function or class
:param group: the keys to group after mapper
:param sort: the keys to sort after mapper
:param ascending: True if ascending else False
:param combiner: combiner function or class, combiner's output should be equal to mapper
:param combiner_buffer_size: combiner's buffer size, 1024 as default
:param mapper_output_names: mapper's output names
:param mapper_output_types: mapper's output types
:param mapper_resources: the resources for mapper
:param reducer_output_names: reducer's output names
:param reducer_output_types: reducer's output types
:param reducer_resources: the resources for reducer
:return:
:Example:
>>> from odps.df import output
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def mapper(row):
>>> for word in row[0].split():
>>> yield word.lower(), 1
>>>
>>> @output(['word', 'cnt'], ['string', 'int'])
>>> def reducer(keys):
>>> cnt = [0]
>>> def h(row, done): # done illustrates that all the rows of the keys are processed
>>> cnt[0] += row.cnt
>>> if done:
>>> yield keys.word, cnt[0]
>>> return h
>>>
>>> words_df.map_reduce(mapper, reducer, group='word')
"""
def _adjust_partial(fun):
if isinstance(fun, functools.partial) and isinstance(fun.func, FunctionWrapper):
wrapped_fun = fun.func
partial_fun = functools.partial(wrapped_fun._func, *fun.args, **fun.keywords)
ret_fun = FunctionWrapper(partial_fun)
ret_fun.output_names = wrapped_fun.output_names
ret_fun.output_types = wrapped_fun.output_types
return ret_fun
else:
return fun
def conv(l):
if l is None:
return
if isinstance(l, tuple):
l = list(l)
elif not isinstance(l, list):
l = [l, ]
return l
def gen_name():
return 'pyodps_field_%s' % str(uuid.uuid4()).replace('-', '_')
def _gen_actual_reducer(reducer, group):
class ActualReducer(object):
def __init__(self, resources=None):
self._func = reducer
self._curr = None
self._prev_rows = None
self._key_named_tuple = namedtuple('NamedKeys', group)
self._resources = resources
self._f = None
def _is_generator_function(self, f):
if inspect.isgeneratorfunction(f):
return True
elif hasattr(f, '__call__') and inspect.isgeneratorfunction(f.__call__):
return True
return False
def __call__(self, row):
key = tuple(getattr(row, n) for n in group)
k = self._key_named_tuple(*key)
if self._prev_rows is not None:
key_consumed = self._curr != key
if self._is_generator_function(self._f):
for it in self._f(self._prev_rows, key_consumed):
yield it
else:
res = self._f(self._prev_rows, key_consumed)
if res:
yield res
self._prev_rows = row
if self._curr is None or self._curr != key:
self._curr = key
if self._resources and self._f is None:
self._func = self._func(self._resources)
self._f = self._func(k)
def close(self):
if self._prev_rows and self._curr:
if self._is_generator_function(self._f):
for it in self._f(self._prev_rows, True):
yield it
else:
res = self._f(self._prev_rows, True)
if res:
yield res
self._prev_rows = None
return ActualReducer
def _gen_combined_mapper(mapper, combiner, names, group, sort, ascending,
buffer_size, mapper_resources=None):
mapper = mapper if not isinstance(mapper, FunctionWrapper) else mapper._func
sort_indexes = [names.index(s) for s in sort]
if isinstance(ascending, bool):
ascending = [ascending] * len(sort)
class CombinedMapper(object):
def __init__(self, resources=None):
if mapper_resources:
self.f = mapper(resources)
elif inspect.isclass(mapper):
self.f = mapper()
else:
self.f = mapper
self.buffer = list()
if inspect.isfunction(self.f):
self.is_generator = inspect.isgeneratorfunction(self.f)
else:
self.is_generator = inspect.isgeneratorfunction(self.f.__call__)
def _cmp_to_key(self, cmp):
"""Convert a cmp= function into a key= function"""
class K(object):
def __init__(self, obj):
self.obj = obj
def __lt__(self, other):
return cmp(self.obj, other.obj) < 0
def __gt__(self, other):
return cmp(self.obj, other.obj) > 0
def __eq__(self, other):
return cmp(self.obj, other.obj) == 0
def __le__(self, other):
return cmp(self.obj, other.obj) <= 0
def __ge__(self, other):
return cmp(self.obj, other.obj) >= 0
def __ne__(self, other):
return cmp(self.obj, other.obj) != 0
return K
def _combine(self):
def cmp(x, y):
for asc, sort_idx in zip(ascending, sort_indexes):
indict = 1 if asc else -1
if x[sort_idx] > y[sort_idx]:
return indict * 1
elif x[sort_idx] < y[sort_idx]:
return indict * -1
else:
continue
return 0
self.buffer.sort(key=self._cmp_to_key(cmp))
ActualCombiner = _gen_actual_reducer(combiner, group)
ac = ActualCombiner()
named_row = namedtuple('NamedRow', names)
for r in self.buffer:
row = named_row(*r)
for l in ac(row):
yield l
for l in ac.close():
yield l
self.buffer = []
def _handle_output_line(self, line):
if len(self.buffer) >= buffer_size:
for l in self._combine():
yield l
self.buffer.append(line)
def __call__(self, row):
if self.is_generator:
for it in self.f(row):
for l in self._handle_output_line(it):
yield l
else:
for l in self._handle_output_line(self.f(row)):
yield l
def close(self):
if len(self.buffer) > 0:
for l in self._combine():
yield l
return CombinedMapper
mapper = _adjust_partial(mapper)
reducer = _adjust_partial(reducer)
combiner = _adjust_partial(combiner)
if isinstance(mapper, FunctionWrapper):
mapper_output_names = mapper_output_names or mapper.output_names
mapper_output_types = mapper_output_types or mapper.output_types
mapper_output_names = conv(mapper_output_names)
mapper_output_types = conv(mapper_output_types)
if mapper_output_types is not None and mapper_output_names is None:
mapper_output_names = [gen_name() for _ in range(len(mapper_output_types))]
if mapper is None and mapper_output_names is None:
mapper_output_names = expr.schema.names
group = conv(group) or mapper_output_names
sort = sort or tuple()
sort = list(OrderedDict.fromkeys(group + conv(sort)))
if len(sort) > len(group):
ascending = [ascending, ] * (len(sort) - len(group)) \
if isinstance(ascending, bool) else list(ascending)
if len(ascending) != len(sort):
ascending = [True] * len(group) + ascending
if not set(group + sort).issubset(mapper_output_names):
raise ValueError('group and sort have to be the column names of mapper')
if mapper is None:
if mapper_output_names and mapper_output_names != expr.schema.names:
raise ExpressionError(
'Null mapper cannot have mapper output names: %s' % mapper_output_names)
if mapper_output_types and mapper_output_types != expr.schema.types:
raise ExpressionError(
'Null mapper cannot have mapper output types: %s' % mapper_output_types)
mapped = expr
if combiner is not None:
raise ValueError('Combiner is not null when mapper is null')
else:
if combiner is not None:
if isinstance(combiner, FunctionWrapper):
if combiner.output_names and \
combiner.output_names != mapper_output_names:
raise ExpressionError(
'Combiner must have the same output names with mapper')
if combiner.output_types and \
combiner.output_types != mapper_output_types:
raise ExpressionError(
'Combiner must have the same output types with mapper')
combiner = combiner._func
mapper = _gen_combined_mapper(mapper, combiner, mapper_output_names,
group, sort, ascending, combiner_buffer_size,
mapper_resources=mapper_resources)
mapped = expr.apply(mapper, axis=1, names=mapper_output_names,
types=mapper_output_types, resources=mapper_resources,
cu_request=mapper_cu)
clustered = mapped.groupby(group).sort(sort, ascending=ascending)
if isinstance(reducer, FunctionWrapper):
reducer_output_names = reducer_output_names or reducer.output_names
reducer_output_types = reducer_output_types or reducer.output_types
reducer = reducer._func
if reducer is None:
if reducer_output_names and reducer_output_names != mapped.schema.names:
raise ExpressionError(
'Null reducer cannot have reducer output names %s' % reducer_output_names)
if reducer_output_types and reducer_output_types != mapped.schema.types:
raise ExpressionError(
'Null reducer cannot have reducer output types %s' % reducer_output_types)
return mapped
ActualReducer = _gen_actual_reducer(reducer, group)
return clustered.apply(ActualReducer, resources=reducer_resources,
names=reducer_output_names, types=reducer_output_types,
cu_request=reducer_cu)