odps/df/backends/odpssql/context.py (232 lines of code) (raw):

#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 1999-2022 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import itertools import json import os import tarfile import uuid import time from collections import OrderedDict from .... import options, tempobj from ....compat import BytesIO, six from ....utils import TEMP_TABLE_PREFIX from ....errors import ODPSError UDF_CLASS_NAME = 'PyOdpsFunc' expr_to_sql = dict() expr_deps = dict() expr_ref_name = dict() class ODPSContext(object): def __init__(self, odps, indent_size=2): self._odps = odps self._index = itertools.count(1) self._expr_alias = dict() self._compiled_exprs = dict() self._expr_raw_args = dict() self._col_index = itertools.count(1) self._need_alias_columns = dict() self._need_alias_column_indexes = dict() self._select_index = itertools.count(1) self._expr_ref_index = itertools.count(1) self._func_to_udfs = OrderedDict() self._registered_funcs = OrderedDict() self._func_to_functions = OrderedDict() self._func_to_resources = OrderedDict() self._func_to_cus = OrderedDict() self._indent_size = indent_size self._mapjoin_hints = [] self._skewjoin_hints = [] self._default_schema = odps.schema self._path_to_resources = dict() self._to_drops = [] def next_select_id(self): return next(self._select_index) def new_alias_table_name(self): return 't%s' % next(self._index) def new_ref_name(self): return '@c%d' % next(self._expr_ref_index) def register_collection(self, expr): expr_id = id(expr) if expr_id in self._expr_alias: return self._expr_alias[expr_id] self._expr_alias[expr_id] = self.new_alias_table_name() return self._expr_alias[expr_id] def get_collection_alias(self, expr, create=False, silent=False): try: return self._expr_alias[id(expr)], False except KeyError as e: if create: return self.register_collection(expr), True if not silent: raise e def remove_collection_alias(self, expr): if id(expr) in self._expr_alias: del self._expr_alias[id(expr)] def add_expr_compiled(self, expr, compiled): self._compiled_exprs[id(expr)] = compiled def get_expr_compiled(self, expr): return self._compiled_exprs[id(expr)] def _gen_udf_name(self, func): identifier = getattr(func, "_identifier", None) or "udf" return 'pyodps_%s_%s_%s' % ( identifier, int(time.time()), str(uuid.uuid4()).replace('-', '_') ) def _gen_resource_name(self): return 'pyodps_res_%s_%s' % (int(time.time()), str(uuid.uuid4()).replace('-', '_')) def _gen_table_name(self): return '%s_%s_%s' % (TEMP_TABLE_PREFIX, int(time.time()), str(uuid.uuid4()).replace('-', '_')) def register_udfs(self, func_to_udfs, func_to_resources, func_to_cus): self._func_to_udfs = func_to_udfs for func in func_to_udfs.keys(): self._registered_funcs[func] = self._gen_udf_name(func) self._func_to_resources = func_to_resources self._func_to_cus = func_to_cus def get_udf(self, func): return self._registered_funcs[func] def get_udf_count(self): return len(self._registered_funcs) def get_udf_sql_hints(self): cu_dict = OrderedDict() for func, name in self._registered_funcs.items(): if func in self._func_to_cus: prefix = self._odps.project + "." if self._default_schema: prefix += self._default_schema + "." cu_dict[prefix + name] = self._func_to_cus[func] if not cu_dict: return {} return {"odps.sql.udf.cu": json.dumps(cu_dict)} def prepare_resources(self, libraries): from ....models import Resource if libraries is None: return None ret_libs = [] for lib in libraries: if isinstance(lib, Resource): ret_libs.append(lib) continue elif lib in self._path_to_resources: ret_libs.append(self._path_to_resources[lib]) continue tarbinary = BytesIO() tar = tarfile.open(fileobj=tarbinary, mode='w:gz') if os.path.isfile(lib): with open(lib, 'rb') as fo: finfo = tarfile.TarInfo('pyodps_files/' + os.path.basename(lib)) finfo.size = os.path.getsize(lib) tar.addfile(finfo, fo) else: base_dir = os.path.dirname(os.path.abspath(lib)) for root, dirs, files in os.walk(lib): for f in files: fpath = os.path.join(root, f) rpath = os.path.relpath(fpath, base_dir).replace(os.path.sep, '/') with open(fpath, 'rb') as fo: finfo = tarfile.TarInfo(rpath) finfo.size = os.path.getsize(fpath) tar.addfile(finfo, fo) tar.close() res_name = self._gen_resource_name() + '.tar.gz' res = self._odps.create_resource( res_name, 'archive', fileobj=tarbinary.getvalue(), schema=self._default_schema, temp=True ) if options.df.delete_udfs: tempobj.register_temp_resource( self._odps, res_name, schema=self._default_schema ) self._to_drops.append(res) self._path_to_resources[lib] = res ret_libs.append(res) return ret_libs def create_udfs(self, libraries=None): self._func_to_functions.clear() for func, udf in six.iteritems(self._func_to_udfs): udf_name = self._registered_funcs[func] py_resource = self._odps.create_resource( udf_name + '.py', 'py', file_obj=udf, schema=self._default_schema, temp=True ) if options.df.delete_udfs: tempobj.register_temp_resource( self._odps, udf_name + '.py', schema=self._default_schema ) self._to_drops.append(py_resource) resources = [py_resource, ] if func in self._func_to_resources: for _, name, _, create, table_name in self._func_to_resources[func]: if not create: resources.append(name) else: res = self._odps.create_resource( name, 'table', table_name=table_name, schema=self._default_schema, temp=True ) if options.df.delete_udfs: tempobj.register_temp_resource(self._odps, name, schema=self._default_schema) self._to_drops.append(res) resources.append(res) if libraries is not None: resources.extend(libraries) function_name = '{0}.{1}'.format(udf_name, UDF_CLASS_NAME) function = self._odps.create_function( udf_name, class_type=function_name, resources=resources, schema=self._default_schema ) if options.df.delete_udfs: tempobj.register_temp_function(self._odps, udf_name, schema=self._default_schema) self._to_drops.append(function) self._func_to_functions[func] = function def add_need_alias_column(self, column): if id(column) in self._need_alias_column_indexes: return self._need_alias_column_indexes[id(column)] symbol = 'col_%s' % next(self._col_index) self._need_alias_columns[id(column)] = column self._need_alias_column_indexes[id(column)] = symbol return symbol def get_all_need_alias_column_symbols(self): for col_id, column in six.iteritems(self._need_alias_columns): symbol = self._need_alias_column_indexes[col_id] yield symbol, column ##################################### # mem cache relatives ##################################### def register_mem_cache_sql(self, expr, sql): expr_to_sql[expr._id] = sql def is_expr_mem_cached(self, expr): return expr._id in expr_to_sql def get_mem_cached_sql(self, expr): return expr_to_sql[expr._id] def register_mem_cache_dep(self, expr, dep): if expr._id not in expr_deps: expr_deps[expr._id] = set() expr_deps[expr._id].add(dep._id) def get_mem_cache_ref_name(self, expr, create=True): if not create: return expr_ref_name[expr._id] ref_name = expr_ref_name.get(expr._id, self.new_ref_name()) if expr._id not in expr_ref_name: expr_ref_name[expr._id] = ref_name return ref_name def get_mem_cache_dep_sqls(self, *expr_ids): sqls = [] fetched = dict() def h(eid): if eid in expr_deps: for dep_id in expr_deps[eid]: if dep_id not in fetched: h(dep_id) if eid not in fetched: origin_sql = expr_to_sql[eid] sql = '{0} := CACHE ON {1}'.format( expr_ref_name[eid], origin_sql ) sqls.append(sql) fetched[eid] = True for expr_id in expr_ids: h(expr_id) return sqls def _drop_silent(self, obj): try: obj.drop() except ODPSError: pass @property def default_schema(self): return self._default_schema @default_schema.setter def default_schema(self, value): self._default_schema = value def close(self): [self._drop_silent(it) for it in self._to_drops]