def _do_execute()

in odps/df/backends/odpssql/engine.py [0:0]


    def _do_execute(self, expr_dag, expr, ui=None, progress_proportion=1,
                    lifecycle=None, head=None, tail=None, hints=None,
                    priority=None, running_cluster=None, schema=None, **kw):
        lifecycle = lifecycle or options.temp_lifecycle
        group = kw.get('group')
        libraries = kw.pop('libraries', None)
        image = kw.pop('image', None)
        use_tunnel = kw.get('use_tunnel', True)

        self._ctx.default_schema = schema or self._ctx.default_schema
        if self._odps.is_schema_namespace_enabled(hints):
            self._ctx.default_schema = self._ctx.default_schema or "default"

        expr_dag = self._convert_table(expr_dag)
        self._rewrite(expr_dag)

        src_expr = expr
        expr = expr_dag.root

        if isinstance(expr, Scalar) and expr.value is not None:
            ui.inc(progress_proportion)
            return expr.value

        no_permission = False
        if options.df.optimizes.tunnel:
            force_tunnel = kw.get('_force_tunnel', False)
            try:
                result = self._handle_cases(expr, ui, progress_proportion=progress_proportion,
                                            head=head, tail=tail)
            except KeyboardInterrupt:
                ui.status('Halt by interruption')
                sys.exit(1)
            except (NoPermission, ConnectTimeout) as ex:
                result = None
                no_permission = True
                if head:
                    expr = expr[:head]
                warnings.warn('Failed to download data by table tunnel, 10000 records will be limited.\n' +
                              'Cause: ' + str(ex))
            if force_tunnel or result is not None:
                return result

        try:
            sql = self._compile(expr, libraries=libraries)
            if types.get_local_use_odps2_types(self._odps.get_project()):
                hints = copy.copy(hints or {})
                hints["odps.sql.type.system.odps2"] = "true"

            cache_data = None
            if not no_permission and isinstance(expr, CollectionExpr) and not isinstance(expr, Summary):
                # When tunnel cannot handle, we will try to create a table
                tmp_table_name = '%s%s' % (TEMP_TABLE_PREFIX, str(uuid.uuid4()).replace('-', '_'))
                register_temp_table(self._odps, tmp_table_name, schema=self._ctx.default_schema)
                cache_data = self._odps.get_table(tmp_table_name, schema=self._ctx.default_schema)

                lifecycle_str = 'LIFECYCLE {0} '.format(lifecycle) if lifecycle is not None else ''
                format_sql = lambda s: 'CREATE TABLE {0} {1}AS \n{2}'.format(tmp_table_name, lifecycle_str, s)
                if isinstance(sql, list):
                    sql[-1] = format_sql(sql[-1])
                else:
                    sql = format_sql(sql)

            sql = self._join_sql(sql)

            logger.info('Sql compiled:\n' + sql)

            try:
                instance = self._run(sql, ui, progress_proportion=progress_proportion * 0.9, hints=hints,
                                     priority=priority, running_cluster=running_cluster, group=group,
                                     libraries=libraries, image=image, schema=self._ctx.default_schema)
            finally:
                self._ctx.close()  # clear udfs and resources generated

            res = self._fetch(expr, src_expr, instance, ui,
                              cache_data=cache_data, head=head, tail=tail,
                              use_tunnel=use_tunnel, group=group,
                              progress_proportion=progress_proportion * 0.1,
                              finish=kw.get('finish', True))
        finally:
            types.set_local_use_odps2_types(None)

        if kw.get('ret_instance', False) is True:
            return instance, res
        return res