services/migration_service/data/postgres_async_db.py (37 lines of code) (raw):
import time
import os
import aiopg
from services.utils import DBConfiguration
class PostgresUtils(object):
@staticmethod
async def is_present(table_name):
with (await AsyncPostgresDB.get_instance().pool.cursor()) as cur:
await cur.execute(
"select * from information_schema.tables where table_name=%s",
(table_name,),
)
return bool(cur.rowcount)
class AsyncPostgresDB(object):
connection = None
__instance = None
pool = None
@staticmethod
def get_instance():
if AsyncPostgresDB.__instance is None:
AsyncPostgresDB()
return AsyncPostgresDB.__instance
def __init__(self):
if self.__instance is not None:
return
AsyncPostgresDB.__instance = self
async def _init(self, db_conf: DBConfiguration):
# todo make poolsize min and max configurable as well as timeout
# todo add retry and better error message
retries = 3
for i in range(retries):
try:
self.pool = await aiopg.create_pool(db_conf.get_dsn(), timeout=db_conf.timeout)
except Exception as e:
print("printing connection exception: " + str(e))
if retries - i < 1:
raise e
time.sleep(1)
continue