runnable-hub/python/runnable_hub/store/database.py (41 lines of code) (raw):

import asyncio import aiomysql from typing import Dict from runnable_hub.interface import RunnableDatabaseStore class RunnableMySQLStore(RunnableDatabaseStore): def __init__(self, host, port, user, password, db): self.host = host self.port = port self.user = user self.password = password self.db = db self._pool = None async def _get_pool(self): if self._pool is None: self._pool = await aiomysql.create_pool( host=self.host, port=self.port, user=self.user, password=self.password, db=self.db, autocommit=True ) return self._pool async def queryRows(self, table, where:Dict): pool = await self._get_pool() async with pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cur: where_clause = ' AND '.join([f"{k}=%s" for k in where.keys()]) sql = f"SELECT * FROM {table} WHERE {where_clause}" await cur.execute(sql, tuple(where.values())) result = await cur.fetchall() return result async def insertRows(self, table, data:Dict): pool = await self._get_pool() async with pool.acquire() as conn: async with conn.cursor() as cur: columns = ', '.join(data.keys()) placeholders = ', '.join(['%s'] * len(data)) sql = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})" await cur.execute(sql, tuple(data.values())) await conn.commit()