in mysqlx-connector-python/lib/mysqlx/connection.py [0:0]
def get_connection(self, settings: Dict[str, Any]) -> PooledConnection:
"""Get a connection from the pool.
This method returns an `PooledConnection` instance which has a reference
to the pool that created it, and can be used as a normal Connection.
When the MySQL connection is not connected, a reconnect is attempted.
Raises:
:class:`PoolError`: On errors.
Returns:
PooledConnection: A pooled connection object.
"""
def set_mysqlx_wait_timeout(cnx: PooledConnection) -> None:
ver = cnx.sql(_SELECT_VERSION_QUERY).execute().fetch_all()[0][0]
# mysqlx_wait_timeout is only available on MySQL 8
if tuple(int(n) for n in ver.split("-")[0].split(".")) > (
8,
0,
10,
):
cnx.sql(f"set mysqlx_wait_timeout = {pool.max_idle_time}").execute()
pools = self._get_pools(settings)
cur_priority = settings.get("cur_priority", None)
error_list = []
self._check_unavailable_pools(settings)
cur_priority = self._get_next_priority(pools, cur_priority)
if cur_priority is None:
raise PoolError(
"Unable to connect to any of the target hosts. No pool is available"
)
settings["cur_priority"] = cur_priority
pool = self._get_next_pool(pools, cur_priority)
lock = threading.RLock()
while pool is not None:
try:
# Check connections aviability in this pool
if pool.qsize() > 0:
# We have connections in pool, try to return a working one
with lock:
try:
cnx = pool.get(block=True, timeout=pool.queue_timeout)
except queue.Empty:
raise PoolError(
"Failed getting connection; pool exhausted"
) from None
try:
if cnx.is_server_disconnected():
pool.remove_connections()
# Only reset the connection by re-authentification
# if the connection was unable to keep open by the
# server
if not cnx.keep_open:
cnx.reset()
set_mysqlx_wait_timeout(cnx)
except (RuntimeError, OSError, InterfaceError):
# Unable to reset connection, close and remove
try:
cnx.close_connection()
except (RuntimeError, OSError, InterfaceError):
pass
finally:
pool.remove_connection(cnx)
# By WL#13222 all idle sessions that connect to the
# same endpoint should be removed from the pool.
while pool.qsize() > 0:
try:
cnx = pool.get(
block=True, timeout=pool.queue_timeout
)
except queue.Empty:
pass
else:
try:
cnx.close_connection()
except (RuntimeError, OSError, InterfaceError):
pass
finally:
pool.remove_connection(cnx)
# Connection was closed by the server, create new
try:
cnx = PooledConnection(pool)
pool.track_connection(cnx)
cnx.connect()
set_mysqlx_wait_timeout(cnx)
except (RuntimeError, OSError, InterfaceError):
pass
finally:
# Server must be down, take down idle
# connections from this pool
while pool.qsize() > 0:
try:
cnx = pool.get(
block=True,
timeout=pool.queue_timeout,
)
cnx.close_connection()
pool.remove_connection(cnx)
except (RuntimeError, OSError, InterfaceError):
pass
return cnx
elif pool.open_connections < pool.pool_max_size:
# No connections in pool, but we can open a new one
cnx = PooledConnection(pool)
pool.track_connection(cnx)
cnx.connect()
set_mysqlx_wait_timeout(cnx)
return cnx
else:
# Pool is exaust so the client needs to wait
with lock:
try:
cnx = pool.get(block=True, timeout=pool.queue_timeout)
cnx.reset()
set_mysqlx_wait_timeout(cnx)
return cnx
except queue.Empty:
raise PoolError("pool max size has been reached") from None
except (InterfaceError, TimeoutError, PoolError) as err:
error_list.append(f"pool: {pool} error: {err}")
if isinstance(err, PoolError):
# Pool can be exhaust now but can be ready again in no time,
# e.g a connection is returned to the pool.
pool.set_unavailable(2)
else:
self.set_pool_unavailable(pool, err)
self._check_unavailable_pools(settings)
# Try next pool with the same priority
pool = self._get_next_pool(pools, cur_priority)
if pool is None:
cur_priority = self._get_next_priority(pools, cur_priority)
settings["cur_priority"] = cur_priority
pool = self._get_next_pool(pools, cur_priority)
if pool is None:
msg = "\n ".join(error_list)
raise PoolError(
"Unable to connect to any of the target hosts: "
f"[\n {msg}\n]"
) from err
continue
raise PoolError("Unable to connect to any of the target hosts")