odps/models/session/v1.py (715 lines of code) (raw):

#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import copy import glob import hashlib import itertools import json import os import re import sys import time import warnings from ... import errors, readers, utils from ...compat import enum, six from ...lib.monotonic import monotonic from .. import tasks from ..instance import Instance, InstanceArrowReader, InstanceRecordReader DEFAULT_TASK_NAME = "AnonymousSQLRTTask" PUBLIC_SESSION_NAME = "public.default" _SUBQUERY_ID_PATTERN = re.compile(r"[\d\D]*_query_(\d+)_[\d\D]*") _SESSION_FILE_PREFIX = "mcqa-session-" _SESSION_FILE_EXPIRE_TIME = 3600 * 24 class FallbackMode(enum.Enum): OFFLINE = 0 INTERACTIVE = 1 class FallbackPolicy: def __init__( self, policy=None, always=False, noresource=False, unsupported=False, timeout=False, upgrading=False, generic=False, ): policies = set() if policy: if isinstance(policy, (set, list, tuple)): policies = set(policy) else: policy = policy.lower().strip() if policy == "default": policy = "unsupported,upgrading,noresource,timeout" elif policy == "all": always = True policies = set(s.strip() for s in policy.split(",")) self.always = always self.noresource = noresource or always or "noresource" in policies self.unsupported = unsupported or always or "unsupported" in policies self.timeout = timeout or always or "timeout" in policies self.upgrading = upgrading or always or "upgrading" in policies self.generic = generic or always or "generic" in policies def get_mode_from_exception(self, exc_value): err_msg = str(exc_value) if isinstance(exc_value, errors.SQARetryError): return FallbackMode.INTERACTIVE elif "OdpsJobCancelledException" in err_msg or "Job is cancelled" in err_msg: return None elif self.always: return FallbackMode.OFFLINE elif self.unsupported and isinstance(exc_value, errors.SQAUnsupportedFeature): return FallbackMode.OFFLINE elif self.upgrading and isinstance( exc_value, (errors.SQAServiceUnavailable, errors.SQAAccessDenied) ): return FallbackMode.OFFLINE elif self.noresource and isinstance(exc_value, errors.SQAResourceNotEnough): return FallbackMode.OFFLINE elif self.timeout and ( isinstance(exc_value, errors.SQAQueryTimedout) or "Wait for cache data timeout" in err_msg or "Get select desc from SQLRTTask timeout" in err_msg ): return FallbackMode.OFFLINE elif self.generic and isinstance(exc_value, errors.SQAGenericError): return FallbackMode.OFFLINE return None def __repr__(self): policies = [ s for s in ["generic", "unsupported", "upgrading", "noresource", "timeout"] if getattr(self, s, None) ] return "<FallbackPolicy %s>" % ",".join(policies) @enum.unique class SessionTaskStatus(enum.Enum): """ Possible statuses of tasks executing inside a session. """ Running = 2 Failed = 4 Terminated = 5 Cancelled = 6 Unknown = -1 TASK_STATUS_VALUES = { 2: SessionTaskStatus.Running, 4: SessionTaskStatus.Failed, 5: SessionTaskStatus.Terminated, 6: SessionTaskStatus.Cancelled, } def _task_status_value_to_enum(task_status): return TASK_STATUS_VALUES.get(task_status, SessionTaskStatus.Unknown) def _get_session_failure_info(task_results): try: taskname, result_txt = list(task_results.items())[0] except BaseException: return "" return result_txt class SessionInstance(Instance): """ This represents the instance created right after you call 'attach_session' or 'create_session'. Further SQL tasks has to be created using this instance. """ __slots__ = ("_project", "_task_name", "_session_name") def __init__(self, **kw): if "session_task_name" not in kw or "session_project" not in kw: raise errors.InvalidArgument( "Creating InSessionInstance without enough information." ) self._task_name = kw.pop("session_task_name", "") self._project = kw.pop("session_project", None) self._session_name = kw.pop("session_name", "") super(SessionInstance, self).__init__(**kw) @classmethod def from_instance(cls, instance, **kw): return SessionInstance( name=instance.id, parent=instance.parent, client=instance._client, **kw ) def _extract_json_info(self): return { "id": self.id, "session_project_name": self._project.name, "session_task_name": self._task_name, "session_name": self._session_name, } def wait_for_startup(self, interval=1, timeout=-1, retry=True, max_interval=None): """ Wait for the session to startup(status changed to RUNNING). :param interval: time interval to check (unit seconds) :param timeout: wait timeout (unit seconds), < 0 means no timeout :param retry: if failed to query session status, should we retry silently :raise: :class:`odps.errors.WaitTimeoutError` if wait timeout and session is not started. :return: None """ start_time = monotonic() end_time = start_time + timeout while not self.is_running(retry): if timeout > 0: if monotonic() > end_time: raise errors.WaitTimeoutError( "Waited %.1f seconds, but session is not started." % (monotonic() - start_time), instance_id=self.id, ) try: time.sleep(interval) if max_interval: interval = min(interval * 2, max_interval) except KeyboardInterrupt: return def _parse_result_session_name(self, result_str): if not self._session_name: session_name_search = re.search("Session name: (.*)$", result_str) if session_name_search: self._session_name = session_name_search.group(1) def run_sql(self, sql, hints=None, **kwargs): task = tasks.SQLTask(query=utils.to_text(sql), **kwargs) task.update_sql_settings(hints) return self._create_internal_instance(task=task) @staticmethod def _check_is_select(sql_statement): try: splited = utils.split_sql_by_semicolon(sql_statement) except Exception as ex: warnings.warn( "Cannot split sql statement %s: %s" % (sql_statement, str(ex)), RuntimeWarning, ) return False return splited[-1].lower().strip(" \t\r\n(").startswith("select") def _create_internal_instance(self, task=None): project_name = self._project.name is_select = self._check_is_select(task.query.strip()) self.parent._fill_task_properties(task) rquery = task.query if not rquery.endswith(";"): rquery = rquery + ";" query_object = { "query": rquery, "settings": json.loads(task.properties["settings"]), } query_json = json.dumps(query_object) resp_content = self.put_task_info( self._task_name, "query", query_json, check_location=True ) created_subquery_id = -1 try: query_result = json.loads(resp_content) query_status = query_result["status"] if query_status != "ok": raise errors.ODPSError( "Failed to run subquery: [%s]: %s" % (query_status, query_result["result"]) ) query_subresult = json.loads(query_result["result"]) created_subquery_id = query_subresult["queryId"] if created_subquery_id == -1: raise errors.parse_instance_error(query_subresult) except KeyError as ex: six.raise_from( errors.ODPSError( "Invalid Response Format: %s\n Response JSON:%s\n" % (str(ex), resp_content.decode()) ), None, ) instance = InSessionInstance( session_project_name=project_name, session_task_name=self._task_name, name=self.id, session_subquery_id=created_subquery_id, session_instance=self, parent=self.parent, session_is_select=is_select, client=self._client, ) return instance def reload(self, blocking=False): resp_text = self.get_task_info(self._task_name, "status") session_status = SessionTaskStatus.Unknown try: poll_result = json.loads(resp_text) self._parse_result_session_name(poll_result["result"]) session_status = _task_status_value_to_enum(poll_result["status"]) except BaseException: error_string = _get_session_failure_info(self.get_task_results()) if error_string: self._status = Instance.Status.TERMINATED six.raise_from(errors.parse_instance_error(error_string), None) else: # this is a task meta info update problem. Just retry. self._status = Instance.Status.SUSPENDED return if session_status == SessionTaskStatus.Running: self._status = Instance.Status.RUNNING elif session_status == SessionTaskStatus.Cancelled: error_string = _get_session_failure_info(self.get_task_results()) self._status = Instance.Status.TERMINATED raise errors.parse_instance_error(error_string) elif session_status == SessionTaskStatus.Failed: error_string = _get_session_failure_info(self.get_task_results()) self._status = Instance.Status.TERMINATED raise errors.parse_instance_error(error_string) elif poll_result["status"] == SessionTaskStatus.Terminated: self._status = Instance.Status.TERMINATED raise errors.ODPSError("Session terminated.") else: self._status = Instance.Status.SUSPENDED class InSessionTunnelReaderMixin(object): @property def schema(self): # is not available before open_reader(). if self._download_session.schema is None: # open reader once to enforce schema fetched. tmprd = self._download_session.open_record_reader(0, 1) tmprd.close() return self._download_session.schema @property def count(self): # we can't count session results before it's # fully retrieved. return -1 @property def status(self): # force reload to update download session status # this is for supporting the stream download of instance tunnel # without the following line will not trigger reload self._download_session.reload() return self._download_session.status def read( self, start=None, count=None, step=None, compress=False, columns=None, **kw ): start = start or 0 step = step or 1 stop = None if count is None else start + step * count with self._download_session.open_record_reader( 0, 1, compress=compress, columns=columns ) as reader: for record in itertools.islice(reader, start, stop, step): yield record class InSessionInstanceArrowReader(InSessionTunnelReaderMixin, InstanceArrowReader): pass class InSessionInstanceRecordReader(InSessionTunnelReaderMixin, InstanceRecordReader): pass class InSessionInstance(Instance): """ This represents the instance created for SQL tasks that run inside a session. This instance is useful when fetching results. """ __slots__ = ( "_project_name", "_session_task_name", "_session", "_session_instance", "_is_select", "_subquery_id", "_report_result", "_report_warning", "_session_task_status", "_task_data", ) def __init__(self, **kw): if ( "session_task_name" not in kw or "session_project_name" not in kw or "session_instance" not in kw or "session_subquery_id" not in kw ): raise errors.InvalidArgument( "Creating InSessionInstance without enough information." ) self._session_task_name = kw.pop("session_task_name", "") self._project_name = kw.pop("session_project_name", "") self._session_instance = kw.pop("session_instance", None) self._is_select = kw.pop("session_is_select", False) self._subquery_id = kw.pop("session_subquery_id", -1) self._report_result = "" self._report_warning = "" self._session_task_status = -1 self._task_data = None if self._subquery_id < 0: raise errors.InternalServerError( "Subquery id not legal: %s" % self._subquery_id ) super(InSessionInstance, self).__init__(**kw) @property def subquery_id(self): return self._subquery_id @utils.survey def _open_result_reader(self, schema=None, task_name=None, **kwargs): """ Fetch result directly from odps. This way does not support limiting, and will cache all result in local memory. To achieve better performance and efficiency, use tunnel instead. """ if not self._is_select: raise errors.InstanceTypeNotSupported("No results for non-select SQL.") self.reload() if not self.is_successful(retry=True): raise errors.ODPSError( "Cannot open reader, instance(%s) may fail or has not finished yet" % self.id ) return readers.CsvRecordReader(schema, self._report_result) def _wait_subquery_id_ready(self): while self._subquery_id == -1 and self._status != Instance.Status.TERMINATED: self.reload() if self._subquery_id == -1: raise errors.InternalServerError("SubQueryId not returned by the server.") def _open_tunnel_reader(self, **kw): if not self._is_select: raise errors.InstanceTypeNotSupported( "InstanceTunnel cannot be opened at a non-select SQL Task." ) self._wait_subquery_id_ready() kw.pop("reopen", False) arrow = kw.pop("arrow", False) endpoint = kw.pop("endpoint", None) quota_name = kw.pop("quota_name", None) kw["sessional"] = True kw["session_subquery_id"] = self._subquery_id if "session_task_name" not in kw: kw["session_task_name"] = self._session_task_name tunnel = self._create_instance_tunnel(endpoint=endpoint, quota_name=quota_name) try: download_session = tunnel.create_download_session(instance=self, **kw) except errors.InternalServerError: e, tb = sys.exc_info()[1:] e.__class__ = Instance.DownloadSessionCreationError six.reraise(Instance.DownloadSessionCreationError, e, tb) self._download_id = download_session.id if arrow: return InSessionInstanceArrowReader(self, download_session) else: return InSessionInstanceRecordReader(self, download_session) def reload(self, blocking=False): resp_text = self.get_task_info( self._session_task_name, "result_%s" % self._subquery_id ) try: query_result = json.loads(resp_text) query_status = query_result["status"] self._report_result = query_result["result"] self._report_warning = query_result["warnings"] self._session_task_status = _task_status_value_to_enum(query_status) if self._session_task_status in ( SessionTaskStatus.Terminated, SessionTaskStatus.Failed, SessionTaskStatus.Cancelled, ): self._status = Instance.Status.TERMINATED elif self._session_task_status == SessionTaskStatus.Running: self._status = Instance.Status.RUNNING else: self._status = Instance.Status.SUSPENDED self._subquery_id = int(query_result.get("subQueryId", -1)) except BaseException as ex: raise errors.ODPSError( "Invalid Response Format: %s\n Response JSON:%s\n" % (str(ex), resp_text) ) def is_successful(self, retry=False, retry_timeout=None): """ If the instance runs successfully. :return: True if successful else False :rtype: bool """ if not self.is_terminated(retry=retry, retry_timeout=retry_timeout): return False if self._session_task_status in ( SessionTaskStatus.Failed, SessionTaskStatus.Cancelled, ): return False return True def wait_for_success( self, interval=1, timeout=None, max_interval=None, blocking=True ): """ Wait for instance to complete, and check if the instance is successful. :param interval: time interval to check :param max_interval: if specified, next check interval will be multiplied by 2 till max_interval is reached. :param blocking: whether to block waiting at server side. Note that this option does not affect client behavior. :param timeout: time :return: None :raise: :class:`odps.errors.ODPSError` if the instance failed """ self.wait_for_completion( interval=interval, max_interval=max_interval, timeout=timeout, blocking=blocking, ) if not self.is_successful(retry=True): raise errors.parse_instance_error(self._report_result) def get_warnings(self): """ Get the warnings reported by ODPS. :return: warning string if ever reported, or empty string for no warning. """ self.reload() return self._report_warning def get_printable_result(self): """ Get the result string that can be directly printed to screen. This should only be used for interactive display. The returning format is not guaranteed. :return: The printable result. On not completed or no result returned, will return empty string. :raise: :class:`odps.errors.ODPSError` if the instance failed. """ self.reload() if self.is_terminated() and not self.is_successful(): raise errors.parse_instance_error(self._report_result) return self._report_result def _get_sql_task(self): resp_text_list = [None] def _load_task_data(): resp_text_list[0] = self.get_task_info( self._session_task_name, "sourcexml_%s" % self._subquery_id ) xml_data = json.loads(resp_text_list[0])["result"] return tasks.SQLTask.parse(None, xml_data) if not self._task_data: self._wait_subquery_id_ready() try: self._task_data = utils.call_with_retry(_load_task_data) except BaseException as ex: raise errors.ODPSError( "Invalid Response Format: %s\n Response JSON:%s\n" % (ex, resp_text_list[0]) ) return self._task_data def get_sql_query(self): try: return self._get_sql_task().query except errors.ODPSError: return None def _parse_subquery_id(self, job_name): if not job_name: return "" match = _SUBQUERY_ID_PATTERN.match(job_name) if match: return match.group(1) elif self.id in job_name: return job_name.split(self.id, 1)[1].replace("_", "") else: return job_name def get_task_detail2(self, task_name=None, **kw): assert task_name is None or task_name == self._session_task_name self._wait_subquery_id_ready() kw["subquery_id"] = "session_query_%d" % self._subquery_id return super(InSessionInstance, self).get_task_detail2( task_name=task_name, **kw ) def _get_queueing_info(self, **kw): self._wait_subquery_id_ready() kw["subquery_id"] = "session_query_%d" % self._subquery_id return super(InSessionInstance, self)._get_queueing_info(**kw) def get_logview_address(self, hours=None, use_legacy=None): self._wait_subquery_id_ready() subquery_suffix = "&subQuery=%s" % self.subquery_id return ( super(InSessionInstance, self).get_logview_address( hours=hours, use_legacy=use_legacy ) + subquery_suffix ) class McqaV1Methods(object): @classmethod @utils.deprecated( "You no longer have to manipulate session instances to use MaxCompute QueryAcceleration. " "Try `run_sql_interactive`." ) def attach_session(cls, odps, session_name, taskname=None, hints=None): """ Attach to an existing session. :param session_name: The session name. :param taskname: The created sqlrt task name. If not provided, the default value is used. Mostly doesn't matter, default works. :return: A SessionInstance you may execute select tasks within. """ return cls._attach_mcqa_session( odps, session_name, task_name=taskname, hints=hints ) @classmethod def _attach_mcqa_session(cls, odps, session_name=None, task_name=None, hints=None): session_name = session_name or PUBLIC_SESSION_NAME task_name = task_name or DEFAULT_TASK_NAME task = tasks.SQLRTTask(name=task_name) task.update_sql_rt_settings(hints) task.update_sql_rt_settings( { "odps.sql.session.share.id": session_name, "odps.sql.submit.mode": "script", } ) project = odps.get_project() return project.instances.create( task=task, session_project=project, session_name=session_name ) @classmethod @utils.deprecated( "You no longer have to manipulate session instances to use MaxCompute QueryAcceleration. " "Try `run_sql_interactive`." ) def default_session(cls, odps): """ Attach to the default session of your project. :return: A SessionInstance you may execute select tasks within. """ return cls._get_default_mcqa_session(odps, wait=False) @classmethod def _get_default_mcqa_session( cls, odps, session_name=None, hints=None, wait=True, service_startup_timeout=60 ): session_name = session_name or PUBLIC_SESSION_NAME if odps._default_session is None: odps._default_session = cls._attach_mcqa_session( odps, session_name, hints=hints ) odps._default_session_name = session_name if wait: odps._default_session.wait_for_startup( 0.1, service_startup_timeout, max_interval=1 ) return odps._default_session @classmethod @utils.deprecated( "You no longer have to manipulate session instances to use MaxCompute QueryAcceleration. " "Try `run_sql_interactive`." ) def create_session( cls, odps, session_worker_count, session_worker_memory, session_name=None, worker_spare_span=None, taskname=None, hints=None, ): """ Create session. :param session_worker_count: How much workers assigned to the session. :param session_worker_memory: How much memory each worker consumes. :param session_name: The session name. Not specifying to use its ID as name. :param worker_spare_span: format "00-24", allocated workers will be reduced during this time. Not specifying to disable this. :param taskname: The created sqlrt task name. If not provided, the default value is used. Mostly doesn't matter, default works. :param hints: Extra hints provided to the session. Parameters of this method will override certain hints. :return: A SessionInstance you may execute select tasks within. """ return cls._create_mcqa_session( odps, session_worker_count, session_worker_memory, session_name, worker_spare_span, taskname, hints, ) @classmethod def _create_mcqa_session( cls, odps, session_worker_count, session_worker_memory, session_name=None, worker_spare_span=None, task_name=None, hints=None, ): if not task_name: task_name = DEFAULT_TASK_NAME session_hints = { "odps.sql.session.worker.count": str(session_worker_count), "odps.sql.session.worker.memory": str(session_worker_memory), "odps.sql.submit.mode": "script", } if session_name: session_hints["odps.sql.session.name"] = session_name if worker_spare_span: session_hints["odps.sql.session.worker.sparespan"] = worker_spare_span task = tasks.SQLRTTask(name=task_name) task.update_sql_rt_settings(hints) task.update_sql_rt_settings(session_hints) project = odps.get_project() return project.instances.create( task=task, session_project=project, session_name=session_name ) @classmethod def _get_mcqa_session_file(cls, odps): try: dir_name = utils.build_pyodps_dir() if not os.path.exists(dir_name): os.makedirs(dir_name) expire_time = time.time() - _SESSION_FILE_EXPIRE_TIME for session_file in glob.glob( os.path.join(dir_name, _SESSION_FILE_PREFIX + "*") ): if os.path.getctime(session_file) < expire_time: try: os.unlink(session_file) except OSError: pass access_id_digest = hashlib.md5( utils.to_binary(odps.account.access_id) ).hexdigest() return os.path.join(dir_name, _SESSION_FILE_PREFIX + access_id_digest) except: return None @classmethod def run_sql_interactive(cls, odps, sql, hints=None, **kwargs): """ Run SQL query in interactive mode (a.k.a MaxCompute QueryAcceleration). Won't fallback to offline mode automatically if query not supported or fails :param sql: the sql query. :param hints: settings for sql query. :return: instance. """ cached_is_running = False service_name = kwargs.pop("service_name", PUBLIC_SESSION_NAME) task_name = kwargs.pop("task_name", None) service_startup_timeout = kwargs.pop("service_startup_timeout", 60) force_reattach = kwargs.pop("force_reattach", False) session_file_name = cls._get_mcqa_session_file(odps) if ( odps._default_session is None and session_file_name and os.path.exists(session_file_name) ): try: with open(session_file_name, "r") as session_file: session_info = json.loads(session_file.read()) instance_obj = odps.get_instance(session_info.pop("id")) session_project = odps.get_project( session_info.pop("session_project_name") ) odps._default_session_name = session_info["session_name"] odps._default_session = SessionInstance.from_instance( instance_obj, session_project=session_project, **session_info ) except: pass if odps._default_session is not None: try: cached_is_running = odps._default_session.is_running() except: pass if ( force_reattach or not cached_is_running or odps._default_session_name != service_name ): # should reattach, for whatever reason (timed out, terminated, never created, # forced using another session) odps._default_session = cls._attach_mcqa_session( odps, service_name, task_name=task_name ) odps._default_session.wait_for_startup( 0.1, service_startup_timeout, max_interval=1 ) odps._default_session_name = service_name if session_file_name: try: with open(session_file_name, "w") as session_file: session_file.write( json.dumps(odps._default_session._extract_json_info()) ) except: pass return odps._default_session.run_sql(sql, hints, **kwargs) @classmethod @utils.deprecated( "The method `run_sql_interactive_with_fallback` is deprecated. " "Try `execute_sql_interactive` with fallback=True argument instead." ) def run_sql_interactive_with_fallback(cls, odps, sql, hints=None, **kwargs): return cls.execute_sql_interactive( odps, sql, hints=hints, fallback="all", wait_fallback=False, **kwargs ) @classmethod def execute_sql_interactive( cls, odps, sql, hints=None, fallback=True, wait_fallback=True, offline_quota_name=None, **kwargs ): """ Run SQL query in interactive mode (a.k.a MaxCompute QueryAcceleration). If query is not supported or fails, and fallback is True, will fallback to offline mode automatically :param sql: the sql query. :param hints: settings for sql query. :param fallback: fallback query to non-interactive mode, True by default. Both boolean type and policy names separated by commas are acceptable. :param bool wait_fallback: wait fallback instance to finish, True by default. :return: instance. """ if isinstance(fallback, (six.string_types, set, list, tuple)): fallback_policy = FallbackPolicy(fallback) elif fallback is False: fallback_policy = None elif fallback is True: fallback_policy = FallbackPolicy("all") else: assert isinstance(fallback, FallbackPolicy) fallback_policy = fallback inst = None use_tunnel = kwargs.pop("tunnel", True) fallback_callback = kwargs.pop("fallback_callback", None) offline_hints = kwargs.pop("offline_hints", None) or {} try: inst = cls.run_sql_interactive(odps, sql, hints=hints, **kwargs) inst.wait_for_success(interval=0.1, max_interval=1) try: rd = inst.open_reader(tunnel=use_tunnel, limit=True) if not rd: raise errors.ODPSError("Get sql result fail") except errors.InstanceTypeNotSupported: # sql is not a select, just skip creating reader pass return inst except BaseException as ex: if fallback_policy is None: raise fallback_mode = fallback_policy.get_mode_from_exception(ex) if fallback_mode is None: raise elif fallback_mode == FallbackMode.INTERACTIVE: kwargs["force_reattach"] = True return cls.execute_sql_interactive( odps, sql, hints=hints, fallback=fallback, wait_fallback=wait_fallback, **kwargs ) else: kwargs.pop("service_name", None) kwargs.pop("force_reattach", None) kwargs.pop("service_startup_timeout", None) hints = copy.copy(offline_hints or hints or {}) hints["odps.task.sql.sqa.enable"] = "false" if fallback_callback is not None: fallback_callback(inst, ex) if inst is not None: hints["odps.sql.session.fallback.instance"] = "%s_%s" % ( inst.id, inst.subquery_id, ) else: hints[ "odps.sql.session.fallback.instance" ] = "fallback4AttachFailed" inst = odps.execute_sql( sql, hints=hints, quota_name=offline_quota_name, **kwargs ) if wait_fallback: inst.wait_for_success() return inst