def get_connection()

in mysqlx-connector-python/lib/mysqlx/connection.py [0:0]


    def get_connection(self, settings: Dict[str, Any]) -> PooledConnection:
        """Get a connection from the pool.

        This method returns an `PooledConnection` instance which has a reference
        to the pool that created it, and can be used as a normal Connection.

        When the MySQL connection is not connected, a reconnect is attempted.

        Raises:
            :class:`PoolError`: On errors.

        Returns:
            PooledConnection: A pooled connection object.
        """

        def set_mysqlx_wait_timeout(cnx: PooledConnection) -> None:
            ver = cnx.sql(_SELECT_VERSION_QUERY).execute().fetch_all()[0][0]
            # mysqlx_wait_timeout is only available on MySQL 8
            if tuple(int(n) for n in ver.split("-")[0].split(".")) > (
                8,
                0,
                10,
            ):
                cnx.sql(f"set mysqlx_wait_timeout = {pool.max_idle_time}").execute()

        pools = self._get_pools(settings)
        cur_priority = settings.get("cur_priority", None)
        error_list = []
        self._check_unavailable_pools(settings)
        cur_priority = self._get_next_priority(pools, cur_priority)
        if cur_priority is None:
            raise PoolError(
                "Unable to connect to any of the target hosts. No pool is available"
            )
        settings["cur_priority"] = cur_priority
        pool = self._get_next_pool(pools, cur_priority)
        lock = threading.RLock()
        while pool is not None:
            try:
                # Check connections aviability in this pool
                if pool.qsize() > 0:
                    # We have connections in pool, try to return a working one
                    with lock:
                        try:
                            cnx = pool.get(block=True, timeout=pool.queue_timeout)
                        except queue.Empty:
                            raise PoolError(
                                "Failed getting connection; pool exhausted"
                            ) from None
                        try:
                            if cnx.is_server_disconnected():
                                pool.remove_connections()
                            # Only reset the connection by re-authentification
                            # if the connection was unable to keep open by the
                            # server
                            if not cnx.keep_open:
                                cnx.reset()
                            set_mysqlx_wait_timeout(cnx)
                        except (RuntimeError, OSError, InterfaceError):
                            # Unable to reset connection, close and remove
                            try:
                                cnx.close_connection()
                            except (RuntimeError, OSError, InterfaceError):
                                pass
                            finally:
                                pool.remove_connection(cnx)
                            # By WL#13222 all idle sessions that connect to the
                            # same endpoint should be removed from the pool.
                            while pool.qsize() > 0:
                                try:
                                    cnx = pool.get(
                                        block=True, timeout=pool.queue_timeout
                                    )
                                except queue.Empty:
                                    pass
                                else:
                                    try:
                                        cnx.close_connection()
                                    except (RuntimeError, OSError, InterfaceError):
                                        pass
                                    finally:
                                        pool.remove_connection(cnx)
                            # Connection was closed by the server, create new
                            try:
                                cnx = PooledConnection(pool)
                                pool.track_connection(cnx)
                                cnx.connect()
                                set_mysqlx_wait_timeout(cnx)
                            except (RuntimeError, OSError, InterfaceError):
                                pass
                            finally:
                                # Server must be down, take down idle
                                # connections from this pool
                                while pool.qsize() > 0:
                                    try:
                                        cnx = pool.get(
                                            block=True,
                                            timeout=pool.queue_timeout,
                                        )
                                        cnx.close_connection()
                                        pool.remove_connection(cnx)
                                    except (RuntimeError, OSError, InterfaceError):
                                        pass
                        return cnx
                elif pool.open_connections < pool.pool_max_size:
                    # No connections in pool, but we can open a new one
                    cnx = PooledConnection(pool)
                    pool.track_connection(cnx)
                    cnx.connect()
                    set_mysqlx_wait_timeout(cnx)
                    return cnx
                else:
                    # Pool is exaust so the client needs to wait
                    with lock:
                        try:
                            cnx = pool.get(block=True, timeout=pool.queue_timeout)
                            cnx.reset()
                            set_mysqlx_wait_timeout(cnx)
                            return cnx
                        except queue.Empty:
                            raise PoolError("pool max size has been reached") from None
            except (InterfaceError, TimeoutError, PoolError) as err:
                error_list.append(f"pool: {pool} error: {err}")
                if isinstance(err, PoolError):
                    # Pool can be exhaust now but can be ready again in no time,
                    # e.g a connection is returned to the pool.
                    pool.set_unavailable(2)
                else:
                    self.set_pool_unavailable(pool, err)

                self._check_unavailable_pools(settings)
                # Try next pool with the same priority
                pool = self._get_next_pool(pools, cur_priority)

                if pool is None:
                    cur_priority = self._get_next_priority(pools, cur_priority)
                    settings["cur_priority"] = cur_priority
                    pool = self._get_next_pool(pools, cur_priority)
                    if pool is None:
                        msg = "\n  ".join(error_list)
                        raise PoolError(
                            "Unable to connect to any of the target hosts: "
                            f"[\n  {msg}\n]"
                        ) from err
                continue

        raise PoolError("Unable to connect to any of the target hosts")