in src/ab/plugins/db/dao.py [0:0]
def execute(self, query, *args, **kwargs):
# only for debug
self.last_query = str(query)
# TODO save conn for transaction, like RDS
with self.engine.connect() as conn:
try:
result = conn.execute(query, *args, **kwargs)
except sqlalchemy.exc.IntegrityError as e:
raise DuplicatedKeyException() from e
else:
if isinstance(query, Select):
return [dict(r) for r in list(result)]
else:
return result