def execute()

in src/ab/plugins/db/dao.py [0:0]


    def execute(self, query, *args, **kwargs):
        # only for debug
        self.last_query = str(query)

        # TODO save conn for transaction, like RDS
        with self.engine.connect() as conn:
            try:
                result = conn.execute(query, *args, **kwargs)
            except sqlalchemy.exc.IntegrityError as e:
                raise DuplicatedKeyException() from e
            else:
                if isinstance(query, Select):
                    return [dict(r) for r in list(result)]
                else:
                    return result