def catch_network_exception()

in mysqlx-connector-python/lib/mysqlx/connection.py [0:0]


def catch_network_exception(func: Callable) -> Callable:
    """Decorator used to catch OSError or RuntimeError.

    Raises:
        :class:`mysqlx.InterfaceError`: If `OSError` or `RuntimeError`
                                        is raised.
    """

    @wraps(func)
    def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
        """Wrapper function."""
        try:
            if (
                isinstance(self, (Connection, PooledConnection))
                and self.is_server_disconnected()
            ):
                raise InterfaceError(*self.get_disconnected_reason())
            result = func(self, *args, **kwargs)
            if isinstance(result, BaseResult):
                warns: Any = result.get_warnings()
                for warn in warns:
                    if warn["code"] in CONNECTION_CLOSED_ERROR:
                        error_msg = CONNECTION_CLOSED_ERROR[warn["code"]]
                        reason = (
                            f"Connection close: {warn['msg']}: {error_msg}",
                            warn["code"],
                        )
                        if isinstance(self, (Connection, PooledConnection)):
                            self.set_server_disconnected(reason)
                        break
            return result
        except (InterfaceError, OSError, RuntimeError, TimeoutError) as err:
            if (
                func.__name__ == "get_column_metadata"
                and args
                and isinstance(args[0], SqlResult)
            ):
                warns = args[0].get_warnings()
                if warns:
                    warn = warns[0]
                    error_msg = CONNECTION_CLOSED_ERROR[warn["code"]]
                    reason = (
                        f"Connection close: {warn['msg']}: {error_msg}",
                        warn["code"],
                    )
                    if isinstance(self, PooledConnection):
                        self.pool.remove_connections()
                        # pool must be listed as faulty if server is shutting down
                        if warn["code"] == 1053:
                            PoolsManager().set_pool_unavailable(
                                self.pool, InterfaceError(*reason)
                            )
                    if isinstance(self, (Connection, PooledConnection)):
                        self.set_server_disconnected(reason)
                    self.disconnect()
                    raise InterfaceError(*reason) from err
            self.disconnect()
            raise

    return wrapper