services/ui_backend_service/data/db/postgres_async_db.py (41 lines of code) (raw):
import json
import os
import time
from typing import List
import aiopg
import psycopg2
import psycopg2.extras
# baselevel classes from shared data adapter to inherit from.
from services.data.postgres_async_db import \
_AsyncPostgresDB as BaseAsyncPostgresDB
from services.utils import DBConfiguration, logging
from .tables import (AsyncArtifactTablePostgres, AsyncFlowTablePostgres,
AsyncMetadataTablePostgres, AsyncRunTablePostgres,
AsyncStepTablePostgres, AsyncTaskTablePostgres)
class AsyncPostgresDB(BaseAsyncPostgresDB):
"""
UI Backend specific database adapter.
Basic functionality is inherited from the classes provided by the shared services.data.postgres_async_db module.
Parameters
----------
name : str (optional)
name for the DB Adapter instance. Used primarily for naming the associated logger.
"""
connection = None
flow_table_postgres = None
run_table_postgres = None
step_table_postgres = None
task_table_postgres = None
artifact_table_postgres = None
metadata_table_postgres = None
pool = None
reader_pool = None
db_conf: DBConfiguration = None
def __init__(self, name='global'):
self.name = name
self.logger = logging.getLogger("AsyncPostgresDB:{name}".format(name=self.name))
tables = []
self.flow_table_postgres = AsyncFlowTablePostgres(self)
self.run_table_postgres = AsyncRunTablePostgres(self)
self.step_table_postgres = AsyncStepTablePostgres(self)
self.task_table_postgres = AsyncTaskTablePostgres(self)
self.artifact_table_postgres = AsyncArtifactTablePostgres(self)
self.metadata_table_postgres = AsyncMetadataTablePostgres(self)
tables.append(self.flow_table_postgres)
tables.append(self.run_table_postgres)
tables.append(self.step_table_postgres)
tables.append(self.task_table_postgres)
tables.append(self.artifact_table_postgres)
tables.append(self.metadata_table_postgres)
self.tables = tables