def create_alloydb_engine()

in use-cases/rag-pipeline/backend/src/alloydb_connect.py [0:0]


def create_alloydb_engine(connector: Connector, catalog_db) -> sqlalchemy.engine.Engine:
    """
    Initializes a SQLAlchemy engine for connecting to AlloyDB.
    """

    def getconn() -> pg8000.dbapi.Connection:
        conn: pg8000.dbapi.Connection = connector.connect(
            instance_uri,
            "pg8000",
            user=user,
            db=catalog_db,
            ip_type=IPTypes.PSC,
            enable_iam_auth=True,
        )
        return conn

    pool = sqlalchemy.create_engine(
        "postgresql+pg8000://",
        creator=getconn,
    )
    pool.dialect.description_encoding = None
    logger.info("Connection pool created successfully.")
    return pool