dbt/adapters/maxcompute/connections.py (57 lines of code) (raw):
from contextlib import contextmanager
from dbt.adapters.contracts.connection import AdapterResponse
from dbt.adapters.events.logging import AdapterLogger
from dbt.adapters.sql import SQLConnectionManager
from dbt_common.exceptions import DbtConfigError, DbtRuntimeError
from odps import options
from dbt.adapters.maxcompute.context import GLOBAL_SQL_HINTS
from dbt.adapters.maxcompute.wrapper import ConnectionWrapper
logger = AdapterLogger("MaxCompute")
class MaxComputeConnectionManager(SQLConnectionManager):
TYPE = "maxcompute"
@classmethod
def open(cls, connection):
if connection.state == "open":
logger.debug("Connection is already open, skipping open.")
return connection
credentials = connection.credentials
o = credentials.odps()
# always use UTC timezone
options.local_timezone = False
options.user_agent_pattern = "dbt-maxcompute $pyodps_version $python_version"
try:
o.get_project().reload()
except Exception as e:
raise DbtConfigError(f"Failed to connect to MaxCompute: {str(e)}") from e
handle = ConnectionWrapper(odps=o, hints=GLOBAL_SQL_HINTS)
connection.state = "open"
connection.handle = handle
return connection
@classmethod
def get_response(cls, cursor):
# FIXME:we should get 'code', 'message', 'rows_affected' from cursor
logger.debug("Current instance id is " + cursor._instance.id)
return AdapterResponse(_message="OK")
@contextmanager
def exception_handler(self, sql: str):
try:
yield
except Exception as exc:
logger.debug("Error while running:\n{}".format(sql))
logger.debug(exc)
if len(exc.args) == 0:
raise
thrift_resp = exc.args[0]
if hasattr(thrift_resp, "status"):
msg = thrift_resp.status.errorMessage
raise DbtRuntimeError(msg)
else:
raise DbtRuntimeError(str(exc))
def cancel(self, connection):
connection.handle.cancel()
def begin(self):
logger.debug("Trigger beginning transaction, actually do nothing...")
# FIXME: Sometimes the number of commits is greater than the number of begins.
# It should be a problem with the micro, which can be reproduced through the test of dbt_show.
def commit(self):
logger.debug("Committing transaction, actually do nothing...")
def add_begin_query(self):
pass
def add_commit_query(self):
pass