def get_connection()

in stk-sample/lambda/stk-player-events-loader-mysql/package/mysqlx/connection.py [0:0]


    def get_connection(self, settings):
        """Get a connection from the pool.

        This method returns an `PooledConnection` instance which has a reference
        to the pool that created it, and can be used as a normal Connection.

        When the MySQL connection is not connected, a reconnect is attempted.

        Raises:
            :class:`PoolError`: On errors.

        Returns:
            PooledConnection: A pooled connection object.
        """
        pools = self._get_pools(settings)
        cur_priority = settings.get("cur_priority", None)
        error_list = []

        if cur_priority is None:
            cur_priority = self._get_next_priority(pools, cur_priority)
        settings["cur_priority"] = cur_priority
        self._check_unavailable_pools(settings)
        pool = self._get_next_pool(pools, cur_priority)

        while pool is not None:
            try:
                # Check connections aviability in this pool
                if pool.qsize() > 0:
                    # We have connections in pool, try to return a working one
                    with threading.RLock():
                        try:
                            cnx = pool.get(block=True,
                                           timeout=pool.queue_timeout)
                        except queue.Empty:
                            raise PoolError(
                                "Failed getting connection; pool exhausted")
                        try:
                            # Only reset the connection by re-authentification
                            # if the connection was unable to keep open by the
                            # server
                            if not cnx.keep_open:
                                cnx.reset()
                            ver = cnx.sql('show variables like "version"'
                                         ).execute().fetch_all()[0][1]
                        except (RuntimeError, socket.error, InterfaceError):
                            # Unable to reset connection, close and remove
                            try:
                                cnx.close_connection()
                            except (RuntimeError, socket.error, InterfaceError):
                                pass
                            finally:
                                pool.remove_connection(cnx)
                            # By WL#13222 all idle sessions that connect to the
                            # same endpoint should be removed from the pool.
                            while pool.qsize() > 0:
                                try:
                                    cnx = pool.get(block=True,
                                                   timeout=pool.queue_timeout)
                                except queue.Empty:
                                    pass
                                else:
                                    try:
                                        cnx.close_connection()
                                    except (RuntimeError, socket.error, InterfaceError):
                                        pass
                                    finally:
                                        pool.remove_connection(cnx)
                            # Connection was closed by the server, create new
                            try:
                                cnx = PooledConnection(pool)
                                pool.track_connection(cnx)
                                cnx.connect()
                                ver = cnx.sql('show variables like "version"'
                                             ).execute().fetch_all()[0][1]
                            except (RuntimeError, socket.error, InterfaceError):
                                pass
                            finally:
                                # Server must be down, take down idle
                                # connections from this pool
                                while pool.qsize() > 0:
                                    try:
                                        cnx = pool.get(block=True,
                                                       timeout=pool.queue_timeout)
                                        cnx.close_connection()
                                        pool.remove_connection(cnx)
                                    except (RuntimeError, socket.error, InterfaceError):
                                        pass
                        # mysqlx_wait_timeout is only available on MySQL 8
                        if tuple([int(n) for n in
                                  ver.split("-")[0].split(".")]) > (8, 0, 10):
                            cnx.sql("set mysqlx_wait_timeout = {}"
                                    "".format(pool.max_idle_time)).execute()
                        return cnx
                elif pool.open_connections < pool.pool_max_size:
                    # No connections in pool, but we can open a new one
                    cnx = PooledConnection(pool)
                    pool.track_connection(cnx)
                    cnx.connect()
                    # mysqlx_wait_timeout is only available on MySQL 8
                    ver = cnx.sql('show variables like "version"'
                                 ).execute().fetch_all()[0][1]
                    if tuple([int(n) for n in
                              ver.split("-")[0].split(".")]) > (8, 0, 10):
                        cnx.sql("set mysqlx_wait_timeout = {}"
                                "".format(pool.max_idle_time)).execute()
                    return cnx
                else:
                    # Pool is exaust so the client needs to wait
                    with threading.RLock():
                        try:
                            cnx = pool.get(block=True,
                                           timeout=pool.queue_timeout)
                            cnx.reset()
                            # mysqlx_wait_timeout is only available on MySQL 8
                            ver = cnx.sql('show variables like "version"'
                                         ).execute().fetch_all()[0][1]
                            if tuple([int(n) for n in
                                      ver.split("-")[0].split(".")]) > \
                                      (8, 0, 10):
                                cnx.sql("set mysqlx_wait_timeout = {}"
                                        "".format(pool.max_idle_time)
                                       ).execute()
                            return cnx
                        except queue.Empty:
                            raise PoolError("pool max size has been reached")
            except (InterfaceError, TimeoutError, PoolError) as err:
                error_list.append("pool: {} error: {}".format(pool, err))
                if isinstance(err, PoolError):
                    # Pool can be exhaust now but can be ready again in no time,
                    # e.g a connection is returned to the pool.
                    pool.set_unavailable(2)
                else:
                    penalty = None
                    for timeout_penalty in _TIMEOUT_PENALTIES:
                        if timeout_penalty in err.msg:
                            penalty = _TIMEOUT_PENALTIES[timeout_penalty]
                    if penalty:
                        pool.set_unavailable(penalty)
                    else:
                        # Other errors are severe punished
                        pool.set_unavailable(100000)

                self._check_unavailable_pools(settings)
                # Try next pool with the same priority
                pool = self._get_next_pool(pools, cur_priority)

                if pool is None:
                    cur_priority = self._get_next_priority(pools, cur_priority)
                    settings["cur_priority"] = cur_priority
                    pool = self._get_next_pool(pools, cur_priority)
                    if pool is None:
                        msg = "\n  ".join(error_list)
                        raise PoolError("Unable to connect to any of the "
                                        "target hosts: [\n  {}\n]".format(msg))
                continue