in src/ab/plugins/db/db_conn_pool.py [0:0]
def get_engine(db_config: str, echo=False, **kwargs):
"""
create engine and adapt for multi-process
# TODO refactor to class, one for each engine
"""
if not isinstance(db_config, str):
raise AlgorithmException('config.DB only supports string format since v2.4.0')
if db_config.startswith('oracle'):
assert os.getenv('ORACLE_HOME') and (os.getenv('DYLD_LIBRARY_PATH') or os.getenv('LD_LIBRARY_PATH')),\
'please set ORACLE_HOME and (DY)LD_LIBRARY_PATH to use oracle database'
# FIXME: use the default QueuePool may cause some strange bugs in async mode
new_engine = create_engine(db_config, echo=echo, echo_pool=echo,
optimize_limits=True, poolclass=NullPool, **kwargs)
elif db_config.startswith('hive'):
# disable hive connection pool
new_engine = create_engine(db_config, echo=echo, echo_pool=echo, poolclass=NullPool, **kwargs)
elif db_config.startswith('sqlite'):
# TODO: sqlite can't use NullPoll in multi-process mode. perhaps a bug of sqlalchemy read/write lock
new_engine = create_engine(db_config, echo=echo, echo_pool=echo, **kwargs)
else:
# default mysql
new_engine = create_engine(db_config, echo=echo, echo_pool=echo, pool_use_lifo=True,
pool_recycle=3600, pool_pre_ping=True, pool_size=50, **kwargs)
@event.listens_for(new_engine, "connect")
def connect(dbapi_connection, connection_record):
connection_record.info['pid'] = os.getpid()
@event.listens_for(new_engine, "checkout")
def checkout(dbapi_connection, connection_record, connection_proxy):
pid = os.getpid()
if connection_record.info['pid'] != pid:
connection_record.connection = connection_proxy.connection = None
raise exc.DisconnectionError(
"Connection record belongs to pid %s, "
"attempting to check out in pid %s" %
(connection_record.info['pid'], pid)
)
return new_engine