in stk-sample/lambda/stk-player-events-loader-mysql/package/mysqlx/connection.py [0:0]
def get_connection(self, settings):
"""Get a connection from the pool.
This method returns an `PooledConnection` instance which has a reference
to the pool that created it, and can be used as a normal Connection.
When the MySQL connection is not connected, a reconnect is attempted.
Raises:
:class:`PoolError`: On errors.
Returns:
PooledConnection: A pooled connection object.
"""
pools = self._get_pools(settings)
cur_priority = settings.get("cur_priority", None)
error_list = []
if cur_priority is None:
cur_priority = self._get_next_priority(pools, cur_priority)
settings["cur_priority"] = cur_priority
self._check_unavailable_pools(settings)
pool = self._get_next_pool(pools, cur_priority)
while pool is not None:
try:
# Check connections aviability in this pool
if pool.qsize() > 0:
# We have connections in pool, try to return a working one
with threading.RLock():
try:
cnx = pool.get(block=True,
timeout=pool.queue_timeout)
except queue.Empty:
raise PoolError(
"Failed getting connection; pool exhausted")
try:
# Only reset the connection by re-authentification
# if the connection was unable to keep open by the
# server
if not cnx.keep_open:
cnx.reset()
ver = cnx.sql('show variables like "version"'
).execute().fetch_all()[0][1]
except (RuntimeError, socket.error, InterfaceError):
# Unable to reset connection, close and remove
try:
cnx.close_connection()
except (RuntimeError, socket.error, InterfaceError):
pass
finally:
pool.remove_connection(cnx)
# By WL#13222 all idle sessions that connect to the
# same endpoint should be removed from the pool.
while pool.qsize() > 0:
try:
cnx = pool.get(block=True,
timeout=pool.queue_timeout)
except queue.Empty:
pass
else:
try:
cnx.close_connection()
except (RuntimeError, socket.error, InterfaceError):
pass
finally:
pool.remove_connection(cnx)
# Connection was closed by the server, create new
try:
cnx = PooledConnection(pool)
pool.track_connection(cnx)
cnx.connect()
ver = cnx.sql('show variables like "version"'
).execute().fetch_all()[0][1]
except (RuntimeError, socket.error, InterfaceError):
pass
finally:
# Server must be down, take down idle
# connections from this pool
while pool.qsize() > 0:
try:
cnx = pool.get(block=True,
timeout=pool.queue_timeout)
cnx.close_connection()
pool.remove_connection(cnx)
except (RuntimeError, socket.error, InterfaceError):
pass
# mysqlx_wait_timeout is only available on MySQL 8
if tuple([int(n) for n in
ver.split("-")[0].split(".")]) > (8, 0, 10):
cnx.sql("set mysqlx_wait_timeout = {}"
"".format(pool.max_idle_time)).execute()
return cnx
elif pool.open_connections < pool.pool_max_size:
# No connections in pool, but we can open a new one
cnx = PooledConnection(pool)
pool.track_connection(cnx)
cnx.connect()
# mysqlx_wait_timeout is only available on MySQL 8
ver = cnx.sql('show variables like "version"'
).execute().fetch_all()[0][1]
if tuple([int(n) for n in
ver.split("-")[0].split(".")]) > (8, 0, 10):
cnx.sql("set mysqlx_wait_timeout = {}"
"".format(pool.max_idle_time)).execute()
return cnx
else:
# Pool is exaust so the client needs to wait
with threading.RLock():
try:
cnx = pool.get(block=True,
timeout=pool.queue_timeout)
cnx.reset()
# mysqlx_wait_timeout is only available on MySQL 8
ver = cnx.sql('show variables like "version"'
).execute().fetch_all()[0][1]
if tuple([int(n) for n in
ver.split("-")[0].split(".")]) > \
(8, 0, 10):
cnx.sql("set mysqlx_wait_timeout = {}"
"".format(pool.max_idle_time)
).execute()
return cnx
except queue.Empty:
raise PoolError("pool max size has been reached")
except (InterfaceError, TimeoutError, PoolError) as err:
error_list.append("pool: {} error: {}".format(pool, err))
if isinstance(err, PoolError):
# Pool can be exhaust now but can be ready again in no time,
# e.g a connection is returned to the pool.
pool.set_unavailable(2)
else:
penalty = None
for timeout_penalty in _TIMEOUT_PENALTIES:
if timeout_penalty in err.msg:
penalty = _TIMEOUT_PENALTIES[timeout_penalty]
if penalty:
pool.set_unavailable(penalty)
else:
# Other errors are severe punished
pool.set_unavailable(100000)
self._check_unavailable_pools(settings)
# Try next pool with the same priority
pool = self._get_next_pool(pools, cur_priority)
if pool is None:
cur_priority = self._get_next_priority(pools, cur_priority)
settings["cur_priority"] = cur_priority
pool = self._get_next_pool(pools, cur_priority)
if pool is None:
msg = "\n ".join(error_list)
raise PoolError("Unable to connect to any of the "
"target hosts: [\n {}\n]".format(msg))
continue