bugbounty_gpt/db/migrate.py (33 lines of code) (raw):
from alembic.config import Config
from alembic import command
from sqlalchemy import create_engine, inspect
from sqlalchemy.exc import OperationalError
from bugbounty_gpt.env import SQLALCHEMY_URL
import time
import logging
logger = logging.getLogger(__name__)
logger.info("Migration auto-init script activated. To turn this off, re-build without the EPHEMERAL_DB arg.")
time.sleep(2)
SYNCHRONOUS_SQLALCHEMY_URL = SQLALCHEMY_URL.replace("+asyncpg", "")
engine = create_engine(SYNCHRONOUS_SQLALCHEMY_URL)
# Number of attempts to connect to the database
MAX_ATTEMPTS = 5
# Time (in seconds) to wait between each attempt
WAIT_TIME = 5
def check_and_init_submission_table(engine):
"""
Checks if the 'submission' table exists, and if not, runs an alembic migration to create it.
:param engine: The SQLAlchemy engine to use for inspecting the database and running migrations.
"""
inspector = inspect(engine)
if "submission" not in inspector.get_table_names():
logger.info("Submission table not found - attempting alembic auto-generate & init.")
alembic_cfg = Config("/usr/src/app/alembic.ini")
command.revision(alembic_cfg, autogenerate=True, message="Auto-generated migration for 'submission' table.")
command.upgrade(alembic_cfg, "head")
def attempt_database_connection(engine):
"""
Attempts to connect to the database, and initializes the submission table if it does not exist.
:param engine: The SQLAlchemy engine to use for inspecting the database and running migrations.
"""
for attempt in range(MAX_ATTEMPTS):
try:
check_and_init_submission_table(engine)
break # Exit the loop if connection is successful
except OperationalError as e:
logger.warning(f"Failed to connect to database (attempt {attempt + 1} of {MAX_ATTEMPTS}). Retrying in {WAIT_TIME} seconds...")
time.sleep(WAIT_TIME)
else:
# This block executes if the loop completes without a 'break' statement (i.e., all attempts failed)
logger.error("Failed to connect to database after all attempts. Exiting.")
raise Exception("Unable to connect to database")
# Starting the connection attempts
attempt_database_connection(engine)