def get_engine()

in src/ab/plugins/db/db_conn_pool.py [0:0]


def get_engine(db_config: str, echo=False, **kwargs):
    """
    create engine and adapt for multi-process
    # TODO refactor to class, one for each engine
    """
    if not isinstance(db_config, str):
        raise AlgorithmException('config.DB only supports string format since v2.4.0')

    if db_config.startswith('oracle'):
        assert os.getenv('ORACLE_HOME') and (os.getenv('DYLD_LIBRARY_PATH') or os.getenv('LD_LIBRARY_PATH')),\
            'please set ORACLE_HOME and (DY)LD_LIBRARY_PATH to use oracle database'
        # FIXME: use the default QueuePool may cause some strange bugs in async mode
        new_engine = create_engine(db_config, echo=echo, echo_pool=echo,
                                   optimize_limits=True, poolclass=NullPool, **kwargs)
    elif db_config.startswith('hive'):
        # disable hive connection pool
        new_engine = create_engine(db_config, echo=echo, echo_pool=echo, poolclass=NullPool, **kwargs)
    elif db_config.startswith('sqlite'):
        # TODO: sqlite can't use NullPoll in multi-process mode. perhaps a bug of sqlalchemy read/write lock
        new_engine = create_engine(db_config, echo=echo, echo_pool=echo, **kwargs)
    else:
        # default mysql
        new_engine = create_engine(db_config, echo=echo, echo_pool=echo, pool_use_lifo=True,
                                   pool_recycle=3600, pool_pre_ping=True, pool_size=50, **kwargs)

    @event.listens_for(new_engine, "connect")
    def connect(dbapi_connection, connection_record):
        connection_record.info['pid'] = os.getpid()

    @event.listens_for(new_engine, "checkout")
    def checkout(dbapi_connection, connection_record, connection_proxy):
        pid = os.getpid()
        if connection_record.info['pid'] != pid:
            connection_record.connection = connection_proxy.connection = None
            raise exc.DisconnectionError(
                "Connection record belongs to pid %s, "
                "attempting to check out in pid %s" %
                (connection_record.info['pid'], pid)
            )

    return new_engine