in aws_advanced_python_wrapper/mysql_driver_dialect.py [0:0]
def get_connection_from_obj(self, obj: object) -> Any:
if isinstance(obj, CMySQLConnection) or isinstance(obj, MySQLConnection):
return obj
if isinstance(obj, CMySQLCursor):
try:
conn = None
if hasattr(obj, '_cnx'):
conn = obj._cnx
elif hasattr(obj, '_connection'):
conn = obj._connection
if conn is None:
return None
if isinstance(conn, CMySQLConnection) or isinstance(conn, MySQLConnection):
return conn
except ReferenceError:
return None
if isinstance(obj, MySQLCursor):
try:
if isinstance(obj._connection, CMySQLConnection) or isinstance(obj._connection, MySQLConnection):
return obj._connection
except ReferenceError:
return None
return None