odps/df/backends/odpssql/codegen.py (675 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import inspect
import os
import platform
import sys
import uuid
from collections import OrderedDict
from .types import df_type_to_odps_type
from ...expr.collections import RowAppliedCollectionExpr
from ...expr.element import MappedExpr
from ...expr.reduction import Aggregation, GroupedAggregation
from ...expr.utils import get_executed_collection_project_table_name
from ...utils import make_copy
from ....config import options
from ....lib import cloudpickle
from ....compat import six
from ....models import FileResource, TableResource, ArchiveResource
from ....utils import to_str
dirname = os.path.dirname(os.path.abspath(cloudpickle.__file__))
CLOUD_PICKLE_FILE = os.path.join(dirname, 'cloudpickle.py')
with open(CLOUD_PICKLE_FILE) as f:
CLOUD_PICKLE = f.read()
IMPORT_FILE = os.path.join(dirname, 'importer.py')
with open(IMPORT_FILE) as f:
MEM_IMPORT = f.read()
CLIENT_IMPL = '(%d, %d, "%s")' % (sys.version_info[0],
sys.version_info[1],
platform.python_implementation().lower())
X_NAMED_TUPLE_FILE = os.path.join(dirname, 'xnamedtuple.py')
with open(X_NAMED_TUPLE_FILE) as f:
X_NAMED_TUPLE = f.read()
UDF_TMPL_HEADER = '''\
%(cloudpickle)s
%(memimport)s
%(xnamedtuple)s
try:
# workaround for character not in range error
import _strptime
except:
pass
import sys
# avoid conflict between protobuf binaries
sys.setdlopenflags(10)
import base64
import inspect
import time
import os
try:
import faulthandler
faulthandler.enable(all_threads=True)
except ImportError:
pass
from odps.udf import annotate
from odps.distcache import get_cache_file, get_cache_table, get_cache_archive
PY2 = sys.version_info[0] == 2
try:
from odps.distcache import get_cache_archive_filenames
except ImportError:
def get_cache_archive_filenames(name, relative_path='.'):
from odps.distcache import WORK_DIR, DistributedCacheError
def _is_parent(parent, child):
return parent == child or child.startswith(parent + '/')
if os.path.split(name)[0] != '':
raise DistributedCacheError("Invalid resource name: " + name)
ret_files = []
# find the real resource path to avoid the symbol link in inner system
resourcepath = os.path.realpath(os.path.join(WORK_DIR, name))
# use realpath == abspath to check the symbol link
dirpath = os.path.join(resourcepath, relative_path)
if not os.path.exists(dirpath):
raise DistributedCacheError("Invalid relative path, file not exists: " + relative_path)
if os.path.realpath(dirpath) != os.path.abspath(dirpath):
raise DistributedCacheError("Invalid relative path, relative path contains symlink: " + relative_path)
if not _is_parent(resourcepath, dirpath):
raise DistributedCacheError("Invalid relative path, path not correct in archive: " + relative_path)
if not os.path.isdir(dirpath):
return [dirpath]
for root, dirs, files in os.walk(dirpath):
for f in dirs:
filename = os.path.join(root, f)
if os.path.islink(filename):
relativename = os.path.relpath(filename, resourcepath)
raise DistributedCacheError("Invalid relative path, relative path contains symlink: " + relativename)
for f in files:
filename = os.path.join(root, f)
if os.path.islink(filename):
relativename = os.path.relpath(filename, resourcepath)
raise DistributedCacheError("Invalid relative path, relative path contains symlink: " + relativename)
ret_files.append(filename)
return ret_files
def get_cache_archive_data(name, relative_path='.'):
try:
return [os.path.normpath(f) for f in get_cache_archive_filenames(name, relative_path)]
except RuntimeError:
return {os.path.normpath(fo.name): fo for fo in get_cache_archive(name, relative_path)}
class UnbufferedStream(object):
def __init__(self, stream):
self.stream = stream
def write(self, data):
self.stream.write(data)
self.stream.flush()
def writelines(self, datas):
self.stream.writelines(datas)
self.stream.flush()
def __getattr__(self, attr):
if attr != 'stream':
return getattr(self.stream, attr)
else:
return object.__getattribute__(self, 'stream')
def __setattr__(self, attr, value):
if attr != 'stream':
return setattr(self.stream, attr, value)
else:
return object.__setattr__(self, 'stream', value)
sys.stdout = UnbufferedStream(sys.stdout)
try:
import socket
except ImportError:
class MockSocketModule(object):
_GLOBAL_DEFAULT_TIMEOUT = object()
def __getattr__(self, item):
raise AttributeError('Accessing attribute `{0}` of module `socket` is prohibited by sandbox.'.format(item))
sys.modules['socket'] = MockSocketModule()
def gen_resource_data(fields, tb):
named_args = xnamedtuple('NamedArgs', fields)
for args in tb:
yield named_args(*args)
def read_lib(lib, f):
if isinstance(f, (list, dict)):
return f
if lib.endswith('.zip') or lib.endswith('.egg') or lib.endswith('.whl'):
if PY2 and hasattr(f, "read"):
return zipfile.ZipFile(f)
else:
return zipfile.ZipFile(f.name)
if lib.endswith('.tar') or lib.endswith('.tar.gz') or lib.endswith('.tar.bz2'):
from io import BytesIO
if lib.endswith('.tar'):
mode = 'r'
else:
mode = 'r:gz' if lib.endswith('.tar.gz') else 'r:bz2'
if PY2 and hasattr(f, "read"):
return tarfile.open(name='', fileobj=f, mode=mode)
else:
return tarfile.open(name=f.name, mode=mode)
if lib.endswith('.py'):
if PY2 and hasattr(f, "read"):
return {f.name: f}
else:
return {f.name: open(f.name, 'rb')}
raise ValueError(
'Unknown library type which should be one of zip(egg, wheel), tar, or tar.gz')
# Use this method to make testThirdPartyLibraries happy
np_generic = None
def load_np_generic():
global np_generic
try:
from numpy import generic
np_generic = generic
except ImportError:
class PseudoNpGeneric(object):
pass
np_generic = PseudoNpGeneric
''' % {
'cloudpickle': CLOUD_PICKLE,
'memimport': MEM_IMPORT,
'xnamedtuple': X_NAMED_TUPLE,
}
UDF_TMPL = '''
@annotate('%(from_type)s->%(to_type)s')
class %(func_cls_name)s(object):
def __init__(self):
unpickler_kw = dict(impl=%(implementation)s, dump_code=%(dump_code)s)
rs = loads(base64.b64decode('%(resources)s'), **unpickler_kw)
resources = []
for t, n, fields in rs:
if t == 'file':
resources.append(get_cache_file(str(n)))
elif t == 'archive':
resources.append(get_cache_archive(str(n)))
else:
tb = get_cache_table(str(n))
if fields:
tb = gen_resource_data(fields, tb)
resources.append(tb)
libraries = (l for l in '%(libraries)s'.split(',') if len(l) > 0)
files = []
for lib in libraries:
if lib.startswith('a:'):
lib = lib[2:]
f = get_cache_archive_data(lib)
else:
f = get_cache_file(lib)
files.append(read_lib(lib, f))
sys.meta_path.append(CompressImporter(*files, supersede=%(supersede_libraries)r))
load_np_generic()
encoded = '%(func_str)s'
f_str = base64.b64decode(encoded)
self.f = loads(f_str, **unpickler_kw)
if inspect.isclass(self.f):
if resources:
self.f = self.f(resources)
else:
self.f = self.f()
else:
if resources:
self.f = self.f(resources)
self.names = tuple(it for it in '%(names_str)s'.split(',') if it)
if self.names:
self.named_args = xnamedtuple('NamedArgs', self.names)
encoded_func_args = '%(func_args_str)s'
func_args_str = base64.b64decode(encoded_func_args)
self.args = loads(func_args_str, **unpickler_kw) or tuple()
encoded_func_kwargs = '%(func_kwargs_str)s'
func_kwargs_str = base64.b64decode(encoded_func_kwargs)
self.kwargs = loads(func_kwargs_str, **unpickler_kw) or dict()
self.from_types = '%(raw_from_type)s'.split(',')
self.to_type = '%(to_type)s'
def _handle_input(self, args):
from datetime import datetime
from decimal import Decimal
res = []
for t, arg in zip(self.from_types, args):
if PY2 and t == 'datetime' and arg is not None and not isinstance(arg, datetime):
res.append(datetime.fromtimestamp(arg / 1000.0))
elif t == 'decimal' and arg is not None and isinstance(arg, str):
res.append(Decimal(arg))
else:
res.append(arg)
return res
def _to_milliseconds(self, dt):
return int((time.mktime(dt.timetuple()) + dt.microsecond/1000000.0) * 1000)
def _handle_output(self, arg):
from datetime import datetime
from decimal import Decimal
t = self.to_type
if PY2 and t == 'datetime' and isinstance(arg, datetime):
if isinstance(arg, np_generic):
arg = arg.item()
return self._to_milliseconds(arg)
elif t == 'string' and isinstance(arg, Decimal):
return str(arg)
else:
if isinstance(arg, np_generic):
arg = arg.item()
return arg
def evaluate(self, %(input_args)s):
args = %(input_args)s,
args = self._handle_input(args)
if not self.names:
args = tuple(args) + tuple(self.args)
res = self.f(*args, **self.kwargs)
return self._handle_output(res)
else:
res = self.f(self.named_args(*args), *self.args, **self.kwargs)
return self._handle_output(res)
'''
UDTF_TMPL = '''
import functools
from odps.udf import BaseUDTF
if PY2:
string_type = unicode
byte_type = str
else:
string_type = str
byte_type = (bytes, bytearray)
@annotate('%(from_type)s->%(to_type)s')
class %(func_cls_name)s(BaseUDTF):
def __init__(self):
unpickler_kw = dict(impl=%(implementation)s, dump_code=%(dump_code)s)
rs = loads(base64.b64decode('%(resources)s'), **unpickler_kw)
resources = []
for t, n, fields in rs:
if t == 'file':
resources.append(get_cache_file(str(n)))
elif t == 'archive':
resources.append(get_cache_archive(str(n)))
else:
tb = get_cache_table(str(n))
if fields:
tb = gen_resource_data(fields, tb)
resources.append(tb)
libraries = (l for l in '%(libraries)s'.split(',') if len(l) > 0)
files = []
for lib in libraries:
if lib.startswith('a:'):
lib = lib[2:]
f = get_cache_archive_data(lib)
else:
f = get_cache_file(lib)
files.append(read_lib(lib, f))
sys.meta_path.append(CompressImporter(*files, supersede=%(supersede_libraries)r))
load_np_generic()
encoded = '%(func_str)s'
f_str = base64.b64decode(encoded)
self.f = loads(f_str, **unpickler_kw)
if inspect.isclass(self.f):
if not resources:
self.f = self.f()
else:
self.f = self.f(resources)
self.is_f_generator = inspect.isgeneratorfunction(self.f.__call__)
self.close_f = getattr(self.f, 'close', None)
self.is_close_f_generator = inspect.isgeneratorfunction(self.close_f)
else:
if resources:
self.f = self.f(resources)
if isinstance(self.f, functools.partial):
self.is_f_generator = inspect.isgeneratorfunction(self.f.func)
else:
self.is_f_generator = inspect.isgeneratorfunction(self.f)
self.close_f = None
self.is_close_f_generator = False
encoded_func_args = '%(func_args_str)s'
func_args_str = base64.b64decode(encoded_func_args)
self.args = loads(func_args_str, **unpickler_kw) or tuple()
encoded_func_kwargs = '%(func_kwargs_str)s'
func_kwargs_str = base64.b64decode(encoded_func_kwargs)
self.kwargs = loads(func_kwargs_str, **unpickler_kw) or dict()
self.names = tuple(it for it in '%(names_str)s'.split(',') if it)
if self.names:
self.name_args = xnamedtuple('NamedArgs', self.names)
self.from_types = '%(raw_from_type)s'.split(',')
self.to_types = '%(to_type)s'.split(',')
def _handle_input(self, args):
from datetime import datetime
from decimal import Decimal
res = []
for t, arg in zip(self.from_types, args):
if PY2 and t == 'datetime' and arg is not None and not isinstance(arg, datetime):
res.append(datetime.fromtimestamp(arg / 1000.0))
elif t == 'decimal' and arg is not None and not isinstance(arg, Decimal):
res.append(Decimal(arg))
else:
res.append(arg)
return res
def _to_milliseconds(self, dt):
return int((time.mktime(dt.timetuple()) + dt.microsecond/1000000.0) * 1000)
def _handle_output(self, args):
from datetime import datetime
from decimal import Decimal
if len(self.to_types) != len(args):
raise ValueError('Function output size should be ' + str(len(self.to_types))
+ ', got ' + str(args))
res = []
for t, arg in zip(self.to_types, args):
if PY2 and t == 'datetime' and isinstance(arg, datetime):
if isinstance(arg, np_generic):
arg = arg.item()
res.append(self._to_milliseconds(arg))
elif t == 'string' and isinstance(arg, Decimal):
res.append(str(arg))
elif PY2 and t == 'string' and isinstance(arg, string_type):
res.append(arg.encode('utf-8'))
elif PY3 and t == 'string' and isinstance(arg, byte_type):
res.append(arg.decode('utf-8'))
else:
if isinstance(arg, np_generic):
arg = arg.item()
res.append(arg)
return res
def process(self, %(input_args)s):
args = %(input_args)s,
args = self._handle_input(args)
if not self.names:
args = tuple(args) + tuple(self.args)
else:
args = (self.name_args(*args), ) + tuple(self.args)
if self.is_f_generator:
for r in self.f(*args, **self.kwargs):
if not isinstance(r, (list, tuple)):
r = (r, )
self.forward(*self._handle_output(r))
else:
res = self.f(*args, **self.kwargs)
if res:
if not isinstance(res, (list, tuple)):
res = (res, )
self.forward(*self._handle_output(res))
def close(self):
if not self.close_f:
return
if self.is_close_f_generator:
for r in self.close_f(*self.args, **self.kwargs):
if not isinstance(r, (list, tuple)):
r = (r, )
self.forward(*self._handle_output(r))
else:
res = self.close_f(*self.args, **self.kwargs)
if res:
if not isinstance(res, (list, tuple)):
res = (res, )
self.forward(*self._handle_output(res))
'''
UDAF_TMPL = '''
from odps.udf import BaseUDAF
@annotate('%(from_type)s->%(to_type)s')
class %(func_cls_name)s(BaseUDAF):
def __init__(self):
unpickler_kw = dict(impl=%(implementation)s, dump_code=%(dump_code)s)
rs = loads(base64.b64decode('%(resources)s'), **unpickler_kw)
resources = []
for t, n, fields in rs:
if t == 'file':
resources.append(get_cache_file(str(n)))
elif t == 'archive':
resources.append(get_cache_archive(str(n)))
else:
tb = get_cache_table(str(n))
if fields:
tb = gen_resource_data(fields, tb)
resources.append(tb)
libraries = (l for l in '%(libraries)s'.split(',') if len(l) > 0)
files = []
for lib in libraries:
if lib.startswith('a:'):
lib = lib[2:]
f = get_cache_archive_data(lib)
else:
f = get_cache_file(lib)
files.append(read_lib(lib, f))
sys.meta_path.append(CompressImporter(*files, supersede=%(supersede_libraries)r))
load_np_generic()
encoded_func_args = '%(func_args_str)s'
func_args_str = base64.b64decode(encoded_func_args)
args = loads(func_args_str, **unpickler_kw) or tuple()
encoded_func_kwargs = '%(func_kwargs_str)s'
func_kwargs_str = base64.b64decode(encoded_func_kwargs)
kwargs = loads(func_kwargs_str, **unpickler_kw) or dict()
encoded = '%(func_str)s'
f_str = base64.b64decode(encoded)
agg = loads(f_str, **unpickler_kw)
if resources:
if not args and not kwargs:
self.f = agg(resources)
else:
kwargs['resources'] = resources
self.f = agg(*args, **kwargs)
else:
self.f = agg(*args, **kwargs)
self.from_types = '%(raw_from_type)s'.split(',')
self.to_type = '%(to_type)s'
def _handle_input(self, args):
from datetime import datetime
from decimal import Decimal
res = []
for t, arg in zip(self.from_types, args):
if PY2 and t == 'datetime' and arg is not None and not isinstance(arg, datetime):
res.append(datetime.fromtimestamp(arg / 1000.0))
elif t == 'decimal' and arg is not None and not isinstance(arg, Decimal):
res.append(Decimal(arg))
else:
res.append(arg)
return res
def _to_milliseconds(self, dt):
return int((time.mktime(dt.timetuple()) + dt.microsecond/1000000.0) * 1000)
def _handle_output(self, arg):
from datetime import datetime
from decimal import Decimal
t = self.to_type
if PY2 and t == 'datetime' and isinstance(arg, datetime):
if isinstance(arg, np_generic):
arg = arg.item()
return self._to_milliseconds(arg)
elif t == 'string' and isinstance(arg, Decimal):
return str(arg)
else:
if isinstance(arg, np_generic):
arg = arg.item()
return arg
def new_buffer(self):
return self.f.buffer()
def iterate(self, buffer, %(input_args)s):
args = %(input_args)s,
args = self._handle_input(args)
self.f(buffer, *args)
def merge(self, buffer, pbuffer):
self.f.merge(buffer, pbuffer)
def terminate(self, buffer):
res = self.f.getvalue(buffer)
return self._handle_output(res)
'''
def _gen_map_udf(node, func_cls_name, libraries, func, resources,
func_to_udfs, func_to_resources, func_to_cus, func_params):
names_str = ''
if isinstance(node, MappedExpr) and node._multiple and \
all(f.name is not None for f in node.inputs):
names_str = ','.join(f.name for f in node.inputs)
from_type = ','.join(df_type_to_odps_type(t).name.replace('`', '') for t in node.input_types)
to_type = df_type_to_odps_type(node.dtype).name.replace('`', '')
raw_from_type = ','.join(
df_type_to_odps_type(t).name.replace('`', '') for t in node.raw_input_types
)
func_args_str = to_str(
base64.b64encode(cloudpickle.dumps(node._func_args, dump_code=options.df.dump_udf))
)
func_kwargs_str = to_str(
base64.b64encode(cloudpickle.dumps(node._func_kwargs, dump_code=options.df.dump_udf))
)
if inspect.isfunction(func) and not func.__closure__:
func_sig = id(func.__code__)
else:
func_sig = func
key = (from_type, to_type, func_sig, tuple(resources), names_str, func_args_str, func_kwargs_str)
if key in func_params:
node.func = func_params[key]
return
else:
if func in func_to_udfs:
func = make_copy(func)
node.func = func
func_params[key] = func
func_to_udfs[func] = UDF_TMPL_HEADER + UDF_TMPL % {
'raw_from_type': raw_from_type,
'from_type': from_type,
'to_type': to_type,
'func_cls_name': func_cls_name,
'func_str': to_str(base64.b64encode(cloudpickle.dumps(func, dump_code=options.df.dump_udf))),
'func_args_str': func_args_str,
'func_kwargs_str': func_kwargs_str,
'names_str': names_str,
'resources': to_str(
base64.b64encode(cloudpickle.dumps([r[:3] for r in resources], dump_code=options.df.dump_udf))),
'implementation': CLIENT_IMPL,
'dump_code': options.df.dump_udf,
'input_args': ', '.join('arg{0}'.format(i) for i in range(len(node.input_types))),
'libraries': ','.join(libraries if libraries is not None else []),
'supersede_libraries': options.df.supersede_libraries,
}
if resources:
func_to_resources[func] = resources
if node._cu_request:
func_to_cus[func] = node._cu_request
def _gen_apply_udf(node, func_cls_name, libraries, func, resources,
func_to_udfs, func_to_resources, func_to_cus, func_params):
names_str = ','.join(f.name for f in node.fields)
from_type = ','.join(df_type_to_odps_type(t).name.replace('`', '') for t in node.input_types)
raw_from_type = ','.join(
df_type_to_odps_type(t).name.replace('`', '') for t in node.raw_input_types
)
to_type = ','.join(df_type_to_odps_type(t).name.replace('`', '') for t in node.schema.types)
func_args_str = to_str(
base64.b64encode(cloudpickle.dumps(node._func_args, dump_code=options.df.dump_udf)))
func_kwargs_str = to_str(
base64.b64encode(cloudpickle.dumps(node._func_kwargs, dump_code=options.df.dump_udf)))
key = (from_type, to_type, func, tuple(resources), names_str, func_args_str, func_kwargs_str)
if key in func_params:
return
else:
if func in func_to_udfs:
func = make_copy(func)
node.func = func
func_params[key] = func
func_to_udfs[func] = UDF_TMPL_HEADER + UDTF_TMPL % {
'raw_from_type': raw_from_type,
'from_type': from_type,
'to_type': to_type,
'func_cls_name': func_cls_name,
'func_str': to_str(base64.b64encode(cloudpickle.dumps(func, dump_code=options.df.dump_udf))),
'func_args_str': func_args_str,
'func_kwargs_str': func_kwargs_str,
'close_func_str': to_str(
base64.b64encode(cloudpickle.dumps(getattr(node, '_close_func', None), dump_code=options.df.dump_udf))),
'names_str': names_str,
'resources': to_str(base64.b64encode(cloudpickle.dumps([r[:3] for r in resources]))),
'implementation': CLIENT_IMPL,
'dump_code': options.df.dump_udf,
'input_args': ', '.join('arg{0}'.format(i) for i in range(len(node.input_types))),
'libraries': ','.join(libraries if libraries is not None else []),
'supersede_libraries': options.df.supersede_libraries,
}
if resources:
func_to_resources[func] = resources
if node._cu_request:
func_to_cus[func] = node._cu_request
def _gen_agg_udf(node, func_cls_name, libraries, func, resources,
func_to_udfs, func_to_resources, func_to_cus, func_params):
from_type = ','.join(df_type_to_odps_type(t).name.replace('`', '') for t in node.input_types)
raw_from_type = ','.join(
df_type_to_odps_type(t).name.replace('`', '') for t in node.raw_input_types
)
to_type = df_type_to_odps_type(node.dtype).name.replace('`', '')
func_args_str = to_str(
base64.b64encode(cloudpickle.dumps(node._func_args, dump_code=options.df.dump_udf)))
func_kwargs_str = to_str(
base64.b64encode(cloudpickle.dumps(node._func_kwargs, dump_code=options.df.dump_udf)))
key = (from_type, to_type, func, tuple(resources), func_args_str, func_kwargs_str)
if key in func_params:
return
else:
if func in func_to_udfs:
func = make_copy(func)
node.func = func
func_params[key] = func
func_to_udfs[func] = UDF_TMPL_HEADER + UDAF_TMPL % {
'raw_from_type': raw_from_type,
'from_type': from_type,
'to_type': to_type,
'func_cls_name': func_cls_name,
'func_str': to_str(base64.b64encode(cloudpickle.dumps(func, dump_code=options.df.dump_udf))),
'func_args_str': func_args_str,
'func_kwargs_str': func_kwargs_str,
'resources': to_str(
base64.b64encode(cloudpickle.dumps([r[:3] for r in resources], dump_code=options.df.dump_udf))),
'implementation': CLIENT_IMPL,
'dump_code': options.df.dump_udf,
'input_args': ', '.join('arg{0}'.format(i) for i in range(len(node.input_types))),
'libraries': ','.join(libraries if libraries is not None else []),
'supersede_libraries': options.df.supersede_libraries,
}
if resources:
func_to_resources[func] = resources
if node._cu_request:
func_to_cus[func] = node._cu_request
def gen_udf(expr, func_cls_name=None, libraries=None):
func_to_udfs = OrderedDict()
func_to_resources = OrderedDict()
func_to_cus = OrderedDict()
func_params = dict()
if libraries is not None:
def _get_library_name(res):
if isinstance(res, six.string_types):
return res
elif isinstance(res, ArchiveResource):
return 'a:' + res.name
else:
return res.name
libraries = [_get_library_name(lib) for lib in libraries]
for node in expr.traverse(unique=True):
func = getattr(node, 'func', None)
if func is None:
continue
if isinstance(func, six.string_types):
continue
resources = []
collection_idx = 0
if hasattr(node, '_resources') and node._resources:
for res in node._resources:
if isinstance(res, ArchiveResource):
tp = 'archive'
name = res.name
fields = None
create = False
table_name = None
elif isinstance(res, FileResource):
tp = 'file'
name = res.name
fields = None
create = False
table_name = None
elif isinstance(res, TableResource):
tp = 'table'
name = res.name
fields = tuple(col.name for col in res.get_source_table().table_schema.simple_columns)
create = False
table_name = None
else:
res = node._collection_resources[collection_idx]
collection_idx += 1
tp = 'table'
name = 'tmp_pyodps_resource_%s' % (uuid.uuid4())
fields = tuple(res.schema.names)
create = True
table_name = get_executed_collection_project_table_name(res)
resources.append((tp, name, fields, create, table_name))
if isinstance(node, MappedExpr):
_gen_map_udf(node, func_cls_name, libraries, func, resources,
func_to_udfs, func_to_resources, func_to_cus, func_params)
elif isinstance(node, RowAppliedCollectionExpr):
_gen_apply_udf(node, func_cls_name, libraries, func, resources,
func_to_udfs, func_to_resources, func_to_cus, func_params)
elif isinstance(node, (Aggregation, GroupedAggregation)):
_gen_agg_udf(node, func_cls_name, libraries, func, resources,
func_to_udfs, func_to_resources, func_to_cus, func_params)
return func_to_udfs, func_to_resources, func_to_cus