in aws_advanced_python_wrapper/utils/pg_exception_handler.py [0:0]
def is_network_exception(self, error: Optional[Exception] = None, sql_state: Optional[str] = None) -> bool:
if isinstance(error, QueryTimeoutError) or isinstance(error, ConnectionTimeout):
return True
if sql_state is None:
try:
error_sql_state = getattr(error, "sqlstate")
if error_sql_state is not None:
sql_state = error_sql_state
except AttributeError:
# getattr may throw an AttributeError if the error does not have a `sqlstate` attribute
pass
if sql_state is not None and sql_state in self._NETWORK_ERRORS:
return True
if isinstance(error, OperationalError):
if len(error.args) == 0:
return False
# Check the error message if this is a generic error
error_msg: str = error.args[0]
return self._CONNECTION_FAILED in error_msg or self._CONSUMING_INPUT_FAILED in error_msg
return False