in odps/df/backends/pd/compiler.py [0:0]
def visit_apply_collection(self, expr):
def conv(l):
if isinstance(l, tuple):
l = list(l)
elif not isinstance(l, list):
l = [l, ]
return l
def handle(kw):
resources = self._get_resources(expr, kw)
input = self._merge_values(expr.fields, kw)
names = [f.name for f in expr.fields]
t = xnamedtuple('NamedArgs', names)
expr._func_args = expr._func_args or ()
expr._func_kwargs = expr._func_kwargs or {}
func = expr._func
self._attempt_pickle_unpickle(func, expr._func_args, expr._func_kwargs)
if isinstance(func, six.string_types) and func.upper() in BUILTIN_FUNCS:
func = BUILTIN_FUNCS[func.upper()]
if inspect.isfunction(func):
if resources:
func = func(resources)
is_generator_function = inspect.isgeneratorfunction(func)
close_func = None
is_close_generator_function = False
elif hasattr(func, '__call__'):
if resources:
func = func(resources)
else:
func = func()
is_generator_function = inspect.isgeneratorfunction(func.__call__)
close_func = getattr(func, 'close', None)
is_close_generator_function = inspect.isgeneratorfunction(close_func)
else:
raise NotImplementedError
rows = []
indices = []
idx = 0
for s in input.iterrows():
row = t(*s[1])
res = func(row, *expr._func_args, **expr._func_kwargs)
expand_num = 0
if is_generator_function:
for l in res:
rows.append(conv(l))
expand_num += 1
else:
if res:
rows.append(conv(res))
expand_num += 1
if expand_num == 0 and expr._keep_nulls:
rows.append([None] * len(names))
expand_num += 1
indices.extend([s[0]] * expand_num)
idx = max(idx, s[0] + 1)
if close_func:
expand_num = 0
if is_close_generator_function:
for l in close_func(*expr._func_args, **expr._func_kwargs):
rows.append(conv(l))
expand_num += 1
else:
rows.append(close_func(*expr._func_args, **expr._func_kwargs))
expand_num += 1
indices.extend([idx] * expand_num)
if expr._lateral_view:
out_df = pd.DataFrame(rows, columns=expr.schema.names,
index=pd.Int64Index(indices))
else:
out_df = pd.DataFrame(rows, columns=expr.schema.names)
return self._check_output_types(out_df, expr.schema.types)
self._add_node(expr, handle)