def visit_apply_collection()

in odps/df/backends/pd/compiler.py [0:0]


    def visit_apply_collection(self, expr):
        def conv(l):
            if isinstance(l, tuple):
                l = list(l)
            elif not isinstance(l, list):
                l = [l, ]
            return l

        def handle(kw):
            resources = self._get_resources(expr, kw)

            input = self._merge_values(expr.fields, kw)

            names = [f.name for f in expr.fields]
            t = xnamedtuple('NamedArgs', names)

            expr._func_args = expr._func_args or ()
            expr._func_kwargs = expr._func_kwargs or {}

            func = expr._func
            self._attempt_pickle_unpickle(func, expr._func_args, expr._func_kwargs)

            if isinstance(func, six.string_types) and func.upper() in BUILTIN_FUNCS:
                func = BUILTIN_FUNCS[func.upper()]
            if inspect.isfunction(func):
                if resources:
                    func = func(resources)

                is_generator_function = inspect.isgeneratorfunction(func)
                close_func = None
                is_close_generator_function = False
            elif hasattr(func, '__call__'):
                if resources:
                    func = func(resources)
                else:
                    func = func()
                is_generator_function = inspect.isgeneratorfunction(func.__call__)
                close_func = getattr(func, 'close', None)
                is_close_generator_function = inspect.isgeneratorfunction(close_func)
            else:
                raise NotImplementedError

            rows = []
            indices = []
            idx = 0
            for s in input.iterrows():
                row = t(*s[1])
                res = func(row, *expr._func_args, **expr._func_kwargs)
                expand_num = 0
                if is_generator_function:
                    for l in res:
                        rows.append(conv(l))
                        expand_num += 1
                else:
                    if res:
                        rows.append(conv(res))
                        expand_num += 1
                if expand_num == 0 and expr._keep_nulls:
                    rows.append([None] * len(names))
                    expand_num += 1
                indices.extend([s[0]] * expand_num)
                idx = max(idx, s[0] + 1)
            if close_func:
                expand_num = 0
                if is_close_generator_function:
                    for l in close_func(*expr._func_args, **expr._func_kwargs):
                        rows.append(conv(l))
                        expand_num += 1
                else:
                    rows.append(close_func(*expr._func_args, **expr._func_kwargs))
                    expand_num += 1
                indices.extend([idx] * expand_num)
            if expr._lateral_view:
                out_df = pd.DataFrame(rows, columns=expr.schema.names,
                                      index=pd.Int64Index(indices))
            else:
                out_df = pd.DataFrame(rows, columns=expr.schema.names)
            return self._check_output_types(out_df, expr.schema.types)

        self._add_node(expr, handle)