#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import base64
import inspect
import os
import platform
import sys
import uuid
from collections import OrderedDict

from .types import df_type_to_odps_type
from ...expr.collections import RowAppliedCollectionExpr
from ...expr.element import MappedExpr
from ...expr.reduction import Aggregation, GroupedAggregation
from ...expr.utils import get_executed_collection_project_table_name
from ...utils import make_copy
from ....config import options
from ....lib import cloudpickle
from ....compat import six
from ....models import FileResource, TableResource, ArchiveResource
from ....utils import to_str

dirname = os.path.dirname(os.path.abspath(cloudpickle.__file__))
CLOUD_PICKLE_FILE = os.path.join(dirname, 'cloudpickle.py')
with open(CLOUD_PICKLE_FILE) as f:
    CLOUD_PICKLE = f.read()

IMPORT_FILE = os.path.join(dirname, 'importer.py')
with open(IMPORT_FILE) as f:
    MEM_IMPORT = f.read()

CLIENT_IMPL = '(%d, %d, "%s")' % (sys.version_info[0],
                                  sys.version_info[1],
                                  platform.python_implementation().lower())

X_NAMED_TUPLE_FILE = os.path.join(dirname, 'xnamedtuple.py')
with open(X_NAMED_TUPLE_FILE) as f:
    X_NAMED_TUPLE = f.read()


UDF_TMPL_HEADER = '''\
%(cloudpickle)s
%(memimport)s

%(xnamedtuple)s

try:
    # workaround for character not in range error
    import _strptime
except:
    pass

import sys
# avoid conflict between protobuf binaries
sys.setdlopenflags(10)

import base64
import inspect
import time
import os
try:
    import faulthandler
    faulthandler.enable(all_threads=True)
except ImportError:
    pass

from odps.udf import annotate
from odps.distcache import get_cache_file, get_cache_table, get_cache_archive

PY2 = sys.version_info[0] == 2

try:
    from odps.distcache import get_cache_archive_filenames
except ImportError:
    def get_cache_archive_filenames(name, relative_path='.'):
        from odps.distcache import WORK_DIR, DistributedCacheError

        def _is_parent(parent, child):
            return parent == child or child.startswith(parent + '/')

        if os.path.split(name)[0] != '':
            raise DistributedCacheError("Invalid resource name: " + name)
        ret_files = []

        # find the real resource path to avoid the symbol link in inner system
        resourcepath = os.path.realpath(os.path.join(WORK_DIR, name))

        # use realpath == abspath to check the symbol link
        dirpath = os.path.join(resourcepath, relative_path)
        if not os.path.exists(dirpath):
            raise DistributedCacheError("Invalid relative path, file not exists: " + relative_path)

        if os.path.realpath(dirpath) != os.path.abspath(dirpath):
            raise DistributedCacheError("Invalid relative path, relative path contains symlink: " + relative_path)

        if not _is_parent(resourcepath, dirpath):
            raise DistributedCacheError("Invalid relative path, path not correct in archive: " + relative_path)

        if not os.path.isdir(dirpath):
            return [dirpath]

        for root, dirs, files in os.walk(dirpath):
            for f in dirs:
                filename = os.path.join(root, f)
                if os.path.islink(filename):
                    relativename = os.path.relpath(filename, resourcepath)
                    raise DistributedCacheError("Invalid relative path, relative path contains symlink: " + relativename)

            for f in files:
                filename = os.path.join(root, f)
                if os.path.islink(filename):
                    relativename = os.path.relpath(filename, resourcepath)
                    raise DistributedCacheError("Invalid relative path, relative path contains symlink: " + relativename)
                ret_files.append(filename)
        return ret_files


def get_cache_archive_data(name, relative_path='.'):
    try:
        return [os.path.normpath(f) for f in get_cache_archive_filenames(name, relative_path)]
    except RuntimeError:
        return {os.path.normpath(fo.name): fo for fo in get_cache_archive(name, relative_path)}


class UnbufferedStream(object):
    def __init__(self, stream):
        self.stream = stream

    def write(self, data):
        self.stream.write(data)
        self.stream.flush()

    def writelines(self, datas):
        self.stream.writelines(datas)
        self.stream.flush()

    def __getattr__(self, attr):
        if attr != 'stream':
            return getattr(self.stream, attr)
        else:
            return object.__getattribute__(self, 'stream')

    def __setattr__(self, attr, value):
        if attr != 'stream':
            return setattr(self.stream, attr, value)
        else:
            return object.__setattr__(self, 'stream', value)

sys.stdout = UnbufferedStream(sys.stdout)


try:
    import socket
except ImportError:
    class MockSocketModule(object):
        _GLOBAL_DEFAULT_TIMEOUT = object()
        def __getattr__(self, item):
            raise AttributeError('Accessing attribute `{0}` of module `socket` is prohibited by sandbox.'.format(item))
    sys.modules['socket'] = MockSocketModule()


def gen_resource_data(fields, tb):
    named_args = xnamedtuple('NamedArgs', fields)
    for args in tb:
        yield named_args(*args)


def read_lib(lib, f):
    if isinstance(f, (list, dict)):
        return f
    if lib.endswith('.zip') or lib.endswith('.egg') or lib.endswith('.whl'):
        if PY2 and hasattr(f, "read"):
            return zipfile.ZipFile(f)
        else:
            return zipfile.ZipFile(f.name)
    if lib.endswith('.tar') or lib.endswith('.tar.gz') or lib.endswith('.tar.bz2'):
        from io import BytesIO
        if lib.endswith('.tar'):
            mode = 'r'
        else:
            mode = 'r:gz' if lib.endswith('.tar.gz') else 'r:bz2'
        if PY2 and hasattr(f, "read"):
            return tarfile.open(name='', fileobj=f, mode=mode)
        else:
            return tarfile.open(name=f.name, mode=mode)
    if lib.endswith('.py'):
        if PY2 and hasattr(f, "read"):
            return {f.name: f}
        else:
            return {f.name: open(f.name, 'rb')}

    raise ValueError(
        'Unknown library type which should be one of zip(egg, wheel), tar, or tar.gz')


# Use this method to make testThirdPartyLibraries happy
np_generic = None
def load_np_generic():
    global np_generic

    try:
        from numpy import generic
        np_generic = generic
    except ImportError:
        class PseudoNpGeneric(object):
            pass
        np_generic = PseudoNpGeneric
''' % {
    'cloudpickle': CLOUD_PICKLE,
    'memimport': MEM_IMPORT,
    'xnamedtuple': X_NAMED_TUPLE,
}


UDF_TMPL = '''

@annotate('%(from_type)s->%(to_type)s')
class %(func_cls_name)s(object):
    def __init__(self):
        unpickler_kw = dict(impl=%(implementation)s, dump_code=%(dump_code)s)
        rs = loads(base64.b64decode('%(resources)s'), **unpickler_kw)
        resources = []
        for t, n, fields in rs:
            if t == 'file':
                resources.append(get_cache_file(str(n)))
            elif t == 'archive':
                resources.append(get_cache_archive(str(n)))
            else:
                tb = get_cache_table(str(n))
                if fields:
                    tb = gen_resource_data(fields, tb)
                resources.append(tb)

        libraries = (l for l in '%(libraries)s'.split(',') if len(l) > 0)
        files = []
        for lib in libraries:
            if lib.startswith('a:'):
                lib = lib[2:]
                f = get_cache_archive_data(lib)
            else:
                f = get_cache_file(lib)
            files.append(read_lib(lib, f))
        sys.meta_path.append(CompressImporter(*files, supersede=%(supersede_libraries)r))

        load_np_generic()

        encoded = '%(func_str)s'
        f_str = base64.b64decode(encoded)
        self.f = loads(f_str, **unpickler_kw)

        if inspect.isclass(self.f):
            if resources:
                self.f = self.f(resources)
            else:
                self.f = self.f()
        else:
            if resources:
                self.f = self.f(resources)

        self.names = tuple(it for it in '%(names_str)s'.split(',') if it)
        if self.names:
            self.named_args = xnamedtuple('NamedArgs', self.names)

        encoded_func_args = '%(func_args_str)s'
        func_args_str = base64.b64decode(encoded_func_args)
        self.args = loads(func_args_str, **unpickler_kw) or tuple()

        encoded_func_kwargs = '%(func_kwargs_str)s'
        func_kwargs_str = base64.b64decode(encoded_func_kwargs)
        self.kwargs = loads(func_kwargs_str, **unpickler_kw) or dict()

        self.from_types = '%(raw_from_type)s'.split(',')
        self.to_type = '%(to_type)s'

    def _handle_input(self, args):
        from datetime import datetime
        from decimal import Decimal

        res = []
        for t, arg in zip(self.from_types, args):
            if PY2 and t == 'datetime' and arg is not None and not isinstance(arg, datetime):
                res.append(datetime.fromtimestamp(arg / 1000.0))
            elif t == 'decimal' and arg is not None and isinstance(arg, str):
                res.append(Decimal(arg))
            else:
                res.append(arg)
        return res

    def _to_milliseconds(self, dt):
        return int((time.mktime(dt.timetuple()) + dt.microsecond/1000000.0) * 1000)

    def _handle_output(self, arg):
        from datetime import datetime
        from decimal import Decimal

        t = self.to_type

        if PY2 and t == 'datetime' and isinstance(arg, datetime):
            if isinstance(arg, np_generic):
                arg = arg.item()
            return self._to_milliseconds(arg)
        elif t == 'string' and isinstance(arg, Decimal):
            return str(arg)
        else:
            if isinstance(arg, np_generic):
                arg = arg.item()
            return arg

    def evaluate(self, %(input_args)s):
        args = %(input_args)s,
        args = self._handle_input(args)
        if not self.names:
            args = tuple(args) + tuple(self.args)
            res = self.f(*args, **self.kwargs)
            return self._handle_output(res)
        else:
            res = self.f(self.named_args(*args), *self.args, **self.kwargs)
            return self._handle_output(res)
'''


UDTF_TMPL = '''
import functools
from odps.udf import BaseUDTF

if PY2:
    string_type = unicode
    byte_type = str
else:
    string_type = str
    byte_type = (bytes, bytearray)

@annotate('%(from_type)s->%(to_type)s')
class %(func_cls_name)s(BaseUDTF):
    def __init__(self):
        unpickler_kw = dict(impl=%(implementation)s, dump_code=%(dump_code)s)
        rs = loads(base64.b64decode('%(resources)s'), **unpickler_kw)
        resources = []
        for t, n, fields in rs:
            if t == 'file':
                resources.append(get_cache_file(str(n)))
            elif t == 'archive':
                resources.append(get_cache_archive(str(n)))
            else:
                tb = get_cache_table(str(n))
                if fields:
                    tb = gen_resource_data(fields, tb)
                resources.append(tb)

        libraries = (l for l in '%(libraries)s'.split(',') if len(l) > 0)
        files = []
        for lib in libraries:
            if lib.startswith('a:'):
                lib = lib[2:]
                f = get_cache_archive_data(lib)
            else:
                f = get_cache_file(lib)
            files.append(read_lib(lib, f))
        sys.meta_path.append(CompressImporter(*files, supersede=%(supersede_libraries)r))

        load_np_generic()

        encoded = '%(func_str)s'
        f_str = base64.b64decode(encoded)
        self.f = loads(f_str, **unpickler_kw)

        if inspect.isclass(self.f):
            if not resources:
                self.f = self.f()
            else:
                self.f = self.f(resources)
            self.is_f_generator = inspect.isgeneratorfunction(self.f.__call__)
            self.close_f = getattr(self.f, 'close', None)
            self.is_close_f_generator = inspect.isgeneratorfunction(self.close_f)
        else:
            if resources:
                self.f = self.f(resources)

            if isinstance(self.f, functools.partial):
                self.is_f_generator = inspect.isgeneratorfunction(self.f.func)
            else:
                self.is_f_generator = inspect.isgeneratorfunction(self.f)
            self.close_f = None
            self.is_close_f_generator = False

        encoded_func_args = '%(func_args_str)s'
        func_args_str = base64.b64decode(encoded_func_args)
        self.args = loads(func_args_str, **unpickler_kw) or tuple()

        encoded_func_kwargs = '%(func_kwargs_str)s'
        func_kwargs_str = base64.b64decode(encoded_func_kwargs)
        self.kwargs = loads(func_kwargs_str, **unpickler_kw) or dict()

        self.names = tuple(it for it in '%(names_str)s'.split(',') if it)
        if self.names:
            self.name_args = xnamedtuple('NamedArgs', self.names)

        self.from_types = '%(raw_from_type)s'.split(',')
        self.to_types = '%(to_type)s'.split(',')

    def _handle_input(self, args):
        from datetime import datetime
        from decimal import Decimal

        res = []
        for t, arg in zip(self.from_types, args):
            if PY2 and t == 'datetime' and arg is not None and not isinstance(arg, datetime):
                res.append(datetime.fromtimestamp(arg / 1000.0))
            elif t == 'decimal' and arg is not None and not isinstance(arg, Decimal):
                res.append(Decimal(arg))
            else:
                res.append(arg)
        return res

    def _to_milliseconds(self, dt):
        return int((time.mktime(dt.timetuple()) + dt.microsecond/1000000.0) * 1000)

    def _handle_output(self, args):
        from datetime import datetime
        from decimal import Decimal

        if len(self.to_types) != len(args):
            raise ValueError('Function output size should be ' + str(len(self.to_types))
                + ', got ' + str(args))

        res = []
        for t, arg in zip(self.to_types, args):
            if PY2 and t == 'datetime' and isinstance(arg, datetime):
                if isinstance(arg, np_generic):
                    arg = arg.item()
                res.append(self._to_milliseconds(arg))
            elif t == 'string' and isinstance(arg, Decimal):
                res.append(str(arg))
            elif PY2 and t == 'string' and isinstance(arg, string_type):
                res.append(arg.encode('utf-8'))
            elif PY3 and t == 'string' and isinstance(arg, byte_type):
                res.append(arg.decode('utf-8'))
            else:
                if isinstance(arg, np_generic):
                    arg = arg.item()
                res.append(arg)
        return res

    def process(self, %(input_args)s):
        args = %(input_args)s,
        args = self._handle_input(args)
        if not self.names:
            args = tuple(args) + tuple(self.args)
        else:
            args = (self.name_args(*args), ) + tuple(self.args)

        if self.is_f_generator:
            for r in self.f(*args, **self.kwargs):
                if not isinstance(r, (list, tuple)):
                    r = (r, )
                self.forward(*self._handle_output(r))
        else:
            res = self.f(*args, **self.kwargs)
            if res:
                if not isinstance(res, (list, tuple)):
                    res = (res, )
                self.forward(*self._handle_output(res))

    def close(self):
        if not self.close_f:
            return

        if self.is_close_f_generator:
            for r in self.close_f(*self.args, **self.kwargs):
                if not isinstance(r, (list, tuple)):
                    r = (r, )
                self.forward(*self._handle_output(r))
        else:
            res = self.close_f(*self.args, **self.kwargs)
            if res:
                if not isinstance(res, (list, tuple)):
                    res = (res, )
                self.forward(*self._handle_output(res))
'''

UDAF_TMPL = '''
from odps.udf import BaseUDAF

@annotate('%(from_type)s->%(to_type)s')
class %(func_cls_name)s(BaseUDAF):
    def __init__(self):
        unpickler_kw = dict(impl=%(implementation)s, dump_code=%(dump_code)s)
        rs = loads(base64.b64decode('%(resources)s'), **unpickler_kw)
        resources = []
        for t, n, fields in rs:
            if t == 'file':
                resources.append(get_cache_file(str(n)))
            elif t == 'archive':
                resources.append(get_cache_archive(str(n)))
            else:
                tb = get_cache_table(str(n))
                if fields:
                    tb = gen_resource_data(fields, tb)
                resources.append(tb)

        libraries = (l for l in '%(libraries)s'.split(',') if len(l) > 0)
        files = []
        for lib in libraries:
            if lib.startswith('a:'):
                lib = lib[2:]
                f = get_cache_archive_data(lib)
            else:
                f = get_cache_file(lib)
            files.append(read_lib(lib, f))
        sys.meta_path.append(CompressImporter(*files, supersede=%(supersede_libraries)r))

        load_np_generic()

        encoded_func_args = '%(func_args_str)s'
        func_args_str = base64.b64decode(encoded_func_args)
        args = loads(func_args_str, **unpickler_kw) or tuple()

        encoded_func_kwargs = '%(func_kwargs_str)s'
        func_kwargs_str = base64.b64decode(encoded_func_kwargs)
        kwargs = loads(func_kwargs_str, **unpickler_kw) or dict()

        encoded = '%(func_str)s'
        f_str = base64.b64decode(encoded)
        agg = loads(f_str, **unpickler_kw)
        if resources:
            if not args and not kwargs:
                self.f = agg(resources)
            else:
                kwargs['resources'] = resources
                self.f = agg(*args, **kwargs)
        else:
            self.f = agg(*args, **kwargs)

        self.from_types = '%(raw_from_type)s'.split(',')
        self.to_type = '%(to_type)s'

    def _handle_input(self, args):
        from datetime import datetime
        from decimal import Decimal

        res = []
        for t, arg in zip(self.from_types, args):
            if PY2 and t == 'datetime' and arg is not None and not isinstance(arg, datetime):
                res.append(datetime.fromtimestamp(arg / 1000.0))
            elif t == 'decimal' and arg is not None and not isinstance(arg, Decimal):
                res.append(Decimal(arg))
            else:
                res.append(arg)
        return res

    def _to_milliseconds(self, dt):
        return int((time.mktime(dt.timetuple()) + dt.microsecond/1000000.0) * 1000)

    def _handle_output(self, arg):
        from datetime import datetime
        from decimal import Decimal

        t = self.to_type

        if PY2 and t == 'datetime' and isinstance(arg, datetime):
            if isinstance(arg, np_generic):
                arg = arg.item()
            return self._to_milliseconds(arg)
        elif t == 'string' and isinstance(arg, Decimal):
            return str(arg)
        else:
            if isinstance(arg, np_generic):
                arg = arg.item()
            return arg

    def new_buffer(self):
        return self.f.buffer()

    def iterate(self, buffer, %(input_args)s):
        args = %(input_args)s,
        args = self._handle_input(args)
        self.f(buffer, *args)

    def merge(self, buffer, pbuffer):
        self.f.merge(buffer, pbuffer)

    def terminate(self, buffer):
        res = self.f.getvalue(buffer)
        return self._handle_output(res)
'''


def _gen_map_udf(node, func_cls_name, libraries, func, resources,
                 func_to_udfs, func_to_resources, func_to_cus, func_params):
    names_str = ''
    if isinstance(node, MappedExpr) and node._multiple and \
            all(f.name is not None for f in node.inputs):
        names_str = ','.join(f.name for f in node.inputs)
    from_type = ','.join(df_type_to_odps_type(t).name.replace('`', '') for t in node.input_types)
    to_type = df_type_to_odps_type(node.dtype).name.replace('`', '')
    raw_from_type = ','.join(
        df_type_to_odps_type(t).name.replace('`', '') for t in node.raw_input_types
    )
    func_args_str = to_str(
        base64.b64encode(cloudpickle.dumps(node._func_args, dump_code=options.df.dump_udf))
    )
    func_kwargs_str = to_str(
        base64.b64encode(cloudpickle.dumps(node._func_kwargs, dump_code=options.df.dump_udf))
    )

    if inspect.isfunction(func) and not func.__closure__:
        func_sig = id(func.__code__)
    else:
        func_sig = func

    key = (from_type, to_type, func_sig, tuple(resources), names_str, func_args_str, func_kwargs_str)
    if key in func_params:
        node.func = func_params[key]
        return
    else:
        if func in func_to_udfs:
            func = make_copy(func)
            node.func = func

        func_params[key] = func

    func_to_udfs[func] = UDF_TMPL_HEADER + UDF_TMPL % {
        'raw_from_type': raw_from_type,
        'from_type': from_type,
        'to_type': to_type,
        'func_cls_name': func_cls_name,
        'func_str': to_str(base64.b64encode(cloudpickle.dumps(func, dump_code=options.df.dump_udf))),
        'func_args_str': func_args_str,
        'func_kwargs_str': func_kwargs_str,
        'names_str': names_str,
        'resources': to_str(
            base64.b64encode(cloudpickle.dumps([r[:3] for r in resources], dump_code=options.df.dump_udf))),
        'implementation': CLIENT_IMPL,
        'dump_code': options.df.dump_udf,
        'input_args': ', '.join('arg{0}'.format(i) for i in range(len(node.input_types))),
        'libraries': ','.join(libraries if libraries is not None else []),
        'supersede_libraries': options.df.supersede_libraries,
    }
    if resources:
        func_to_resources[func] = resources
    if node._cu_request:
        func_to_cus[func] = node._cu_request


def _gen_apply_udf(node, func_cls_name, libraries, func, resources,
                   func_to_udfs, func_to_resources, func_to_cus, func_params):
    names_str = ','.join(f.name for f in node.fields)
    from_type = ','.join(df_type_to_odps_type(t).name.replace('`', '') for t in node.input_types)
    raw_from_type = ','.join(
        df_type_to_odps_type(t).name.replace('`', '') for t in node.raw_input_types
    )
    to_type = ','.join(df_type_to_odps_type(t).name.replace('`', '') for t in node.schema.types)
    func_args_str = to_str(
        base64.b64encode(cloudpickle.dumps(node._func_args, dump_code=options.df.dump_udf)))
    func_kwargs_str = to_str(
        base64.b64encode(cloudpickle.dumps(node._func_kwargs, dump_code=options.df.dump_udf)))

    key = (from_type, to_type, func, tuple(resources), names_str, func_args_str, func_kwargs_str)
    if key in func_params:
        return
    else:
        if func in func_to_udfs:
            func = make_copy(func)
            node.func = func

        func_params[key] = func

    func_to_udfs[func] = UDF_TMPL_HEADER + UDTF_TMPL % {
        'raw_from_type': raw_from_type,
        'from_type': from_type,
        'to_type': to_type,
        'func_cls_name': func_cls_name,
        'func_str': to_str(base64.b64encode(cloudpickle.dumps(func, dump_code=options.df.dump_udf))),
        'func_args_str': func_args_str,
        'func_kwargs_str': func_kwargs_str,
        'close_func_str': to_str(
            base64.b64encode(cloudpickle.dumps(getattr(node, '_close_func', None), dump_code=options.df.dump_udf))),
        'names_str': names_str,
        'resources': to_str(base64.b64encode(cloudpickle.dumps([r[:3] for r in resources]))),
        'implementation': CLIENT_IMPL,
        'dump_code': options.df.dump_udf,
        'input_args': ', '.join('arg{0}'.format(i) for i in range(len(node.input_types))),
        'libraries': ','.join(libraries if libraries is not None else []),
        'supersede_libraries': options.df.supersede_libraries,
    }
    if resources:
        func_to_resources[func] = resources
    if node._cu_request:
        func_to_cus[func] = node._cu_request


def _gen_agg_udf(node, func_cls_name, libraries, func, resources,
                 func_to_udfs, func_to_resources, func_to_cus, func_params):
    from_type = ','.join(df_type_to_odps_type(t).name.replace('`', '') for t in node.input_types)
    raw_from_type = ','.join(
        df_type_to_odps_type(t).name.replace('`', '') for t in node.raw_input_types
    )
    to_type = df_type_to_odps_type(node.dtype).name.replace('`', '')
    func_args_str = to_str(
        base64.b64encode(cloudpickle.dumps(node._func_args, dump_code=options.df.dump_udf)))
    func_kwargs_str = to_str(
        base64.b64encode(cloudpickle.dumps(node._func_kwargs, dump_code=options.df.dump_udf)))

    key = (from_type, to_type, func, tuple(resources), func_args_str, func_kwargs_str)
    if key in func_params:
        return
    else:
        if func in func_to_udfs:
            func = make_copy(func)
            node.func = func

        func_params[key] = func

    func_to_udfs[func] = UDF_TMPL_HEADER + UDAF_TMPL % {
        'raw_from_type': raw_from_type,
        'from_type': from_type,
        'to_type': to_type,
        'func_cls_name': func_cls_name,
        'func_str': to_str(base64.b64encode(cloudpickle.dumps(func, dump_code=options.df.dump_udf))),
        'func_args_str': func_args_str,
        'func_kwargs_str': func_kwargs_str,
        'resources': to_str(
            base64.b64encode(cloudpickle.dumps([r[:3] for r in resources], dump_code=options.df.dump_udf))),
        'implementation': CLIENT_IMPL,
        'dump_code': options.df.dump_udf,
        'input_args': ', '.join('arg{0}'.format(i) for i in range(len(node.input_types))),
        'libraries': ','.join(libraries if libraries is not None else []),
        'supersede_libraries': options.df.supersede_libraries,
    }
    if resources:
        func_to_resources[func] = resources
    if node._cu_request:
        func_to_cus[func] = node._cu_request


def gen_udf(expr, func_cls_name=None, libraries=None):
    func_to_udfs = OrderedDict()
    func_to_resources = OrderedDict()
    func_to_cus = OrderedDict()
    func_params = dict()
    if libraries is not None:
        def _get_library_name(res):
            if isinstance(res, six.string_types):
                return res
            elif isinstance(res, ArchiveResource):
                return 'a:' + res.name
            else:
                return res.name

        libraries = [_get_library_name(lib) for lib in libraries]

    for node in expr.traverse(unique=True):
        func = getattr(node, 'func', None)
        if func is None:
            continue
        if isinstance(func, six.string_types):
            continue

        resources = []
        collection_idx = 0
        if hasattr(node, '_resources') and node._resources:
            for res in node._resources:
                if isinstance(res, ArchiveResource):
                    tp = 'archive'
                    name = res.name
                    fields = None
                    create = False
                    table_name = None
                elif isinstance(res, FileResource):
                    tp = 'file'
                    name = res.name
                    fields = None
                    create = False
                    table_name = None
                elif isinstance(res, TableResource):
                    tp = 'table'
                    name = res.name
                    fields = tuple(col.name for col in res.get_source_table().table_schema.simple_columns)
                    create = False
                    table_name = None
                else:
                    res = node._collection_resources[collection_idx]
                    collection_idx += 1

                    tp = 'table'
                    name = 'tmp_pyodps_resource_%s' % (uuid.uuid4())
                    fields = tuple(res.schema.names)
                    create = True
                    table_name = get_executed_collection_project_table_name(res)
                resources.append((tp, name, fields, create, table_name))

        if isinstance(node, MappedExpr):
            _gen_map_udf(node, func_cls_name, libraries, func, resources,
                         func_to_udfs, func_to_resources, func_to_cus, func_params)
        elif isinstance(node, RowAppliedCollectionExpr):
            _gen_apply_udf(node, func_cls_name, libraries, func, resources,
                           func_to_udfs, func_to_resources, func_to_cus, func_params)
        elif isinstance(node, (Aggregation, GroupedAggregation)):
            _gen_agg_udf(node, func_cls_name, libraries, func, resources,
                         func_to_udfs, func_to_resources, func_to_cus, func_params)

    return func_to_udfs, func_to_resources, func_to_cus
