in odps/df/backends/odpssql/engine.py [0:0]
def _do_execute(self, expr_dag, expr, ui=None, progress_proportion=1,
lifecycle=None, head=None, tail=None, hints=None,
priority=None, running_cluster=None, schema=None, **kw):
lifecycle = lifecycle or options.temp_lifecycle
group = kw.get('group')
libraries = kw.pop('libraries', None)
image = kw.pop('image', None)
use_tunnel = kw.get('use_tunnel', True)
self._ctx.default_schema = schema or self._ctx.default_schema
if self._odps.is_schema_namespace_enabled(hints):
self._ctx.default_schema = self._ctx.default_schema or "default"
expr_dag = self._convert_table(expr_dag)
self._rewrite(expr_dag)
src_expr = expr
expr = expr_dag.root
if isinstance(expr, Scalar) and expr.value is not None:
ui.inc(progress_proportion)
return expr.value
no_permission = False
if options.df.optimizes.tunnel:
force_tunnel = kw.get('_force_tunnel', False)
try:
result = self._handle_cases(expr, ui, progress_proportion=progress_proportion,
head=head, tail=tail)
except KeyboardInterrupt:
ui.status('Halt by interruption')
sys.exit(1)
except (NoPermission, ConnectTimeout) as ex:
result = None
no_permission = True
if head:
expr = expr[:head]
warnings.warn('Failed to download data by table tunnel, 10000 records will be limited.\n' +
'Cause: ' + str(ex))
if force_tunnel or result is not None:
return result
try:
sql = self._compile(expr, libraries=libraries)
if types.get_local_use_odps2_types(self._odps.get_project()):
hints = copy.copy(hints or {})
hints["odps.sql.type.system.odps2"] = "true"
cache_data = None
if not no_permission and isinstance(expr, CollectionExpr) and not isinstance(expr, Summary):
# When tunnel cannot handle, we will try to create a table
tmp_table_name = '%s%s' % (TEMP_TABLE_PREFIX, str(uuid.uuid4()).replace('-', '_'))
register_temp_table(self._odps, tmp_table_name, schema=self._ctx.default_schema)
cache_data = self._odps.get_table(tmp_table_name, schema=self._ctx.default_schema)
lifecycle_str = 'LIFECYCLE {0} '.format(lifecycle) if lifecycle is not None else ''
format_sql = lambda s: 'CREATE TABLE {0} {1}AS \n{2}'.format(tmp_table_name, lifecycle_str, s)
if isinstance(sql, list):
sql[-1] = format_sql(sql[-1])
else:
sql = format_sql(sql)
sql = self._join_sql(sql)
logger.info('Sql compiled:\n' + sql)
try:
instance = self._run(sql, ui, progress_proportion=progress_proportion * 0.9, hints=hints,
priority=priority, running_cluster=running_cluster, group=group,
libraries=libraries, image=image, schema=self._ctx.default_schema)
finally:
self._ctx.close() # clear udfs and resources generated
res = self._fetch(expr, src_expr, instance, ui,
cache_data=cache_data, head=head, tail=tail,
use_tunnel=use_tunnel, group=group,
progress_proportion=progress_proportion * 0.1,
finish=kw.get('finish', True))
finally:
types.set_local_use_odps2_types(None)
if kw.get('ret_instance', False) is True:
return instance, res
return res