uamqp/errors.py (254 lines of code) (raw):

#------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for # license information. #-------------------------------------------------------------------------- from uamqp import c_uamqp, constants, utils def _process_send_error(policy, condition, description=None, info=None): try: amqp_condition = constants.ErrorCodes(condition) except ValueError: exception = MessageException(condition, description, info) exception.action = policy.on_unrecognized_error(exception) else: exception = MessageSendFailed(amqp_condition, description, info) exception.action = policy.on_message_error(exception) return exception def _process_link_error(policy, condition, description=None, info=None): try: amqp_condition = constants.ErrorCodes(condition) except ValueError: exception = VendorLinkDetach(condition, description, info) exception.action = policy.on_unrecognized_error(exception) else: if amqp_condition == constants.ErrorCodes.LinkRedirect: exception = LinkRedirect(amqp_condition, description, info) else: exception = LinkDetach(amqp_condition, description, info) exception.action = policy.on_link_error(exception) return exception def _process_connection_error(policy, condition, description=None, info=None): try: amqp_condition = constants.ErrorCodes(condition) except ValueError: exception = VendorConnectionClose(condition, description, info) exception.action = policy.on_unrecognized_error(exception) else: exception = ConnectionClose(amqp_condition, description, info) exception.action = policy.on_connection_error(exception) return exception class ErrorAction(object): retry = True fail = False def __init__(self, retry, backoff=0, increment_retries=True): self.retry = bool(retry) self.backoff = backoff self.increment_retries = increment_retries class ErrorPolicy(object): no_retry = ( constants.ErrorCodes.DecodeError, constants.ErrorCodes.LinkMessageSizeExceeded, constants.ErrorCodes.NotFound, constants.ErrorCodes.NotImplemented, constants.ErrorCodes.LinkRedirect, constants.ErrorCodes.NotAllowed, constants.ErrorCodes.UnauthorizedAccess, constants.ErrorCodes.LinkStolen, constants.ErrorCodes.ResourceLimitExceeded, constants.ErrorCodes.ConnectionRedirect, constants.ErrorCodes.PreconditionFailed, constants.ErrorCodes.InvalidField, constants.ErrorCodes.ResourceDeleted, constants.ErrorCodes.IllegalState, constants.ErrorCodes.FrameSizeTooSmall, constants.ErrorCodes.ConnectionFramingError, constants.ErrorCodes.SessionUnattachedHandle, constants.ErrorCodes.SessionHandleInUse, constants.ErrorCodes.SessionErrantLink, constants.ErrorCodes.SessionWindowViolation ) def __init__(self, max_retries=3, on_error=None): self.max_retries = max_retries self._on_error = on_error def on_unrecognized_error(self, error): if self._on_error: return self._on_error(error) return ErrorAction(retry=True) def on_message_error(self, error): if error.condition in self.no_retry: return ErrorAction(retry=False) return ErrorAction(retry=True, increment_retries=True) def on_link_error(self, error): if error.condition in self.no_retry: return ErrorAction(retry=False) return ErrorAction(retry=True) def on_connection_error(self, error): if error.condition in self.no_retry: return ErrorAction(retry=False) if error.condition == constants.ErrorCodes.ConnectionCloseForced: return ErrorAction(retry=True) return ErrorAction(retry=True) class AMQPError(Exception): pass class AMQPClientShutdown(KeyboardInterrupt): def __init__(self): message = "Client shutdown with keyboard interrupt." super(AMQPClientShutdown, self).__init__(message) class AMQPConnectionError(AMQPError): pass class MessageHandlerError(AMQPConnectionError): pass class ConnectionClose(AMQPConnectionError): def __init__(self, condition, description=None, info=None, encoding="UTF-8"): self._encoding = encoding self.condition = condition self.description = description self.info = info self.action = None message = str(condition) if isinstance(condition, constants.ErrorCodes) \ else condition.decode(encoding) if self.description: if isinstance(self.description, str): message += u": {}".format(self.description) else: message += u": {}".format(self.description.decode(self._encoding)) super(ConnectionClose, self).__init__(message) class VendorConnectionClose(ConnectionClose): pass class LinkDetach(AMQPConnectionError): def __init__(self, condition, description=None, info=None, encoding="UTF-8"): self._encoding = encoding self.condition = condition self.description = description self.info = info self.action = None message = str(condition) if isinstance(condition, constants.ErrorCodes) \ else condition.decode(encoding) if self.description: if isinstance(self.description, str): message += u": {}".format(self.description) else: message += u": {}".format(self.description.decode(self._encoding)) super(LinkDetach, self).__init__(message) class VendorLinkDetach(LinkDetach): pass class LinkRedirect(LinkDetach): def __init__(self, condition, description=None, info=None, encoding="UTF-8"): self.hostname = info.get(b'hostname') self.network_host = info.get(b'network-host') self.port = info.get(b'port') self.address = info.get(b'address') self.scheme = info.get(b'scheme') self.path = info.get(b'path') super(LinkRedirect, self).__init__(condition, description, info, encoding) class ClientTimeout(AMQPError): pass class AuthenticationException(AMQPError): pass class TokenExpired(AuthenticationException): pass class TokenAuthFailure(AuthenticationException): def __init__(self, status_code, description, encoding="UTF-8"): self._encoding = encoding self.status_code = status_code self.description = description message = "CBS Token authentication failed.\nStatus code: {}".format(self.status_code) if self.description: if isinstance(self.description, str): message += u"\nDescription: {}".format(self.description) else: message += u"\nDescription: {}".format(self.description.decode(self._encoding)) super(TokenAuthFailure, self).__init__(message) class MessageResponse(AMQPError): def __init__(self, message=None): response = message or "Sending {} disposition.".format(self.__class__.__name__) super(MessageResponse, self).__init__(response) class MessageException(MessageResponse): def __init__(self, condition, description=None, info=None, encoding="UTF-8"): self._encoding = encoding self.condition = condition self.description = description self.info = info self.action = None message = str(condition) if isinstance(condition, constants.ErrorCodes) \ else condition.decode(encoding) if self.description: if isinstance(self.description, str): message += u": {}".format(self.description) else: message += u": {}".format(self.description.decode(self._encoding)) super(MessageException, self).__init__(message=message) class MessageSendFailed(MessageException): pass class ClientMessageError(MessageException): def __init__(self, exception, info=None): if hasattr(exception, 'condition'): condition = exception.condition description = exception.description else: condition = constants.ErrorCodes.ClientError description = str(exception) super(ClientMessageError, self).__init__(condition, description=description, info=info) class MessageAlreadySettled(MessageResponse): def __init__(self): response = "Invalid operation: this message is already settled." super(MessageAlreadySettled, self).__init__(response) def __reduce__(self): return (self.__class__, ()) class MessageAccepted(MessageResponse): pass class MessageRejected(MessageResponse): def __init__(self, condition=None, description=None, encoding='UTF-8', info=None): self._encoding = encoding self._info = info if condition: self.error_condition = condition.encode(encoding) if isinstance(condition, str) else condition else: self.error_condition = b"amqp:internal-error" self.error_description = None if description: self.error_description = description.encode(encoding) if isinstance(description, str) \ else description else: self.error_description = b"" if info and not isinstance(info, dict): raise TypeError("Disposition error info must be a dictionary.") self.error_info = utils.data_factory(info, encoding=encoding) if info else None super(MessageRejected, self).__init__() def __reduce__(self): return (self.__class__, (self.error_condition, self.error_description, self._encoding, self._info)) class MessageReleased(MessageResponse): pass class MessageModified(MessageResponse): def __init__(self, failed, undeliverable, annotations=None, encoding='UTF-8'): self.failed = failed self.undeliverable = undeliverable self._encoding = encoding self._annotations = annotations if annotations and not isinstance(annotations, dict): raise TypeError("Disposition annotations must be a dictionary.") self.annotations = utils.data_factory(annotations, encoding=encoding) if annotations else None super(MessageModified, self).__init__() def __reduce__(self): return (self.__class__, (self.failed, self.undeliverable, self._annotations, self._encoding)) class ErrorResponse(object): def __init__(self, error_info=None, condition=None, description=None, info=None): info = None self.condition = condition self.description = description self.info = info self.error = error_info if isinstance(error_info, c_uamqp.cError): self.condition = error_info.condition self.description = error_info.description info = error_info.info elif isinstance(error_info, list) and len(error_info) >= 1: if isinstance(error_info[0], list) and len(error_info[0]) >= 1: self.condition = error_info[0][0] if len(error_info[0]) >= 2: self.description = error_info[0][1] if len(error_info[0]) >= 3: info = error_info[0][2] try: self.info = info.value except AttributeError: self.info = info class MessageContentTooLarge(ValueError): def __init__(self): message = "Data set too large for a single message." super(MessageContentTooLarge, self).__init__(message)