in retrieval_service/datastore/providers/cloudsql_mysql.py [0:0]
def create_sync(cls, config: Config) -> "Client":
def getconn() -> pymysql.Connection:
if cls.__connector is None:
cls.__connector = Connector(refresh_strategy=RefreshStrategy.LAZY)
return cls.__connector.connect(
# Cloud SQL instance connection name
f"{config.project}:{config.region}:{config.instance}",
"pymysql",
user=f"{config.user}",
password=f"{config.password}",
db=f"{config.database}",
autocommit=True,
)
pool = create_engine(
"mysql+pymysql://",
creator=getconn,
)
if pool is None:
raise TypeError("pool not instantiated")
return cls(pool, config.database)