dbt/adapters/maxcompute/wrapper.py (45 lines of code) (raw):

import copy import time from dbt.adapters.events.logging import AdapterLogger from odps.dbapi import Cursor, Connection from odps.errors import ODPSError from dbt.adapters.maxcompute.setting_parser import SettingParser class ConnectionWrapper(Connection): def cursor(self, *args, **kwargs): return CursorWrapper( self, *args, hints=copy.deepcopy(self._hints), **kwargs, ) def cancel(self): self.close() logger = AdapterLogger("MaxCompute") class CursorWrapper(Cursor): def execute(self, operation, parameters=None, **kwargs): # retry ten times, each time wait for 15 seconds result = SettingParser.parse(operation) retry_times = 10 for i in range(retry_times): try: super().execute(result.remaining_query, hints=result.settings) self._instance.wait_for_success() return except ODPSError as e: # 0130201: view not found, 0110061, 0130131: table not found if ( e.code == "ODPS-0130201" or e.code == "ODPS-0130211" # Table or view already exists or e.code == "ODPS-0110061" or e.code == "ODPS-0130131" or e.code == "ODPS-0420111" ): if i == retry_times - 1: raise e logger.warning(f"Retry because of {e}, retry times {i + 1}") time.sleep(15) continue else: o = self.connection.odps if e.instance_id: instance = o.get_instance(e.instance_id) logger.error(instance.get_logview_address()) raise e