stream/clients/python/bookkeeper/common/exceptions.py (178 lines of code) (raw):
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Exceptions raised by bookkeeper clients.
This module provides base classes for all errors raised by libraries based
on :mod:`bookkeeper.common`.
"""
from __future__ import absolute_import
from __future__ import unicode_literals
from bookkeeper.proto import storage_pb2
import six
try:
import grpc
except ImportError: # pragma: NO COVER
grpc = None
# Lookup tables for mapping exceptions from gRPC transports.
# Populated by _APICallErrorMeta
_GRPC_CODE_TO_EXCEPTION = {}
class BKError(Exception):
"""Base class for all exceptions raised by bookkeeper Clients."""
pass
@six.python_2_unicode_compatible
class RetryError(BKError):
"""Raised when a function has exhausted all of its available retries.
Args:
message (str): The exception message.
cause (Exception): The last exception raised when retring the
function.
"""
def __init__(self, message, cause):
super(RetryError, self).__init__(message)
self.message = message
self._cause = cause
@property
def cause(self):
"""The last exception raised when retrying the function."""
return self._cause
def __str__(self):
return '{}, last exception: {}'.format(self.message, self.cause)
class _BKGRpcCallErrorMeta(type):
"""Metaclass for registering BKGRpcCallError subclasses."""
def __new__(mcs, name, bases, class_dict):
cls = type.__new__(mcs, name, bases, class_dict)
if cls.grpc_status_code is not None:
_GRPC_CODE_TO_EXCEPTION.setdefault(cls.grpc_status_code, cls)
return cls
@six.python_2_unicode_compatible
@six.add_metaclass(_BKGRpcCallErrorMeta)
class BKGrpcCallError(BKError):
"""Base class for exceptions raised by calling API methods.
Args:
message (str): The exception message.
errors (Sequence[Any]): An optional list of error details.
response (Union[requests.Request, grpc.Call]): The response or
gRPC call metadata.
"""
grpc_status_code = None
"""Optional[grpc.StatusCode]: The gRPC status code associated with this
error.
This may be ``None`` if the exception does not match up to a gRPC error.
"""
bk_status_code = None
"""Optional[bookkeeper.proto.StatusCode]: The bookkeeper storage status code
associated with this error.
This may be ```None` if the exception is a gRPC channel error.
"""
def __init__(self, message, errors=(), response=None):
super(BKGrpcCallError, self).__init__(message)
self.message = message
"""str: The exception message."""
self._errors = errors
self._response = response
def __str__(self):
return 'grpc_status_code = {}, bk_status_code = {} : {}'\
.format(self.grpc_status_code, self.bk_status_code, self.message)
@property
def errors(self):
"""Detailed error information.
Returns:
Sequence[Any]: A list of additional error details.
"""
return list(self._errors)
@property
def response(self):
"""Optional[Union[requests.Request, grpc.Call]]: The response or
gRPC call metadata."""
return self._response
class ClientError(BKGrpcCallError):
"""Base class for all client error responses."""
class BadRequest(ClientError):
"""Exception mapping a ``400 Bad Request`` response."""
code = storage_pb2.BAD_REQUEST
grpc_status_code =\
grpc.StatusCode.FAILED_PRECONDITION if grpc is not None else None
class IllegalOpError(ClientError):
"""Exception mapping a ``403 Illegal Op`` response."""
code = storage_pb2.ILLEGAL_OP
grpc_status_code =\
grpc.StatusCode.FAILED_PRECONDITION if grpc is not None else None
class ServerError(BKGrpcCallError):
"""Base for 5xx responses."""
class StorageContainerNotFoundError(BKGrpcCallError):
"""Exception raised when storage container is not found"""
code = None
grpc_status_code =\
grpc.StatusCode.NOT_FOUND if grpc is not None else None
class InternalServerError(ServerError):
"""Exception mapping a ``500 Internal Server Error`` response. or a
:attr:`grpc.StatusCode.INTERNAL` error."""
code = storage_pb2.INTERNAL_SERVER_ERROR
grpc_status_code =\
grpc.StatusCode.INTERNAL if grpc is not None else None
class UnimplementedError(ServerError):
code = storage_pb2.NOT_IMPLEMENTED
grpc_status_code =\
grpc.StatusCode.UNIMPLEMENTED if grpc is not None else None
class UnexpectedError(BKGrpcCallError):
code = storage_pb2.UNEXPECTED
grpc_status_code =\
grpc.StatusCode.UNKNOWN if grpc is not None else None
class StorageError(BKGrpcCallError):
grpc_status_code = None
class FailureError(StorageError):
code = storage_pb2.FAILURE
class BadVersionError(StorageError):
code = storage_pb2.BAD_VERSION
class BadRevisionError(StorageError):
code = storage_pb2.BAD_REVISION
class NamespaceError(StorageError):
code = storage_pb2.UNEXPECTED
class InvalidNamespaceNameError(NamespaceError):
code = storage_pb2.INVALID_NAMESPACE_NAME
class NamespaceNotFoundError(NamespaceError):
code = storage_pb2.NAMESPACE_NOT_FOUND
class NamespaceExistsError(NamespaceError):
code = storage_pb2.NAMESPACE_EXISTS
class StreamError(StorageError):
code = storage_pb2.UNEXPECTED
class InvalidStreamNameError(StreamError):
code = storage_pb2.INVALID_STREAM_NAME
class StreamNotFoundError(StreamError):
code = storage_pb2.STREAM_NOT_FOUND
class StreamExistsError(StreamError):
code = storage_pb2.STREAM_EXISTS
class TableError(StorageError):
code = storage_pb2.UNEXPECTED
class InvalidKeyError(TableError):
code = storage_pb2.INVALID_KEY
class KeyNotFoundError(TableError):
code = storage_pb2.KEY_NOT_FOUND
class KeyExistsError(TableError):
code = storage_pb2.KEY_EXISTS
_BK_CODE_TO_EXCEPTION_ = {
storage_pb2.FAILURE: FailureError,
storage_pb2.BAD_REQUEST: BadRequest,
storage_pb2.ILLEGAL_OP: IllegalOpError,
storage_pb2.INTERNAL_SERVER_ERROR: InternalServerError,
storage_pb2.NOT_IMPLEMENTED: UnimplementedError,
storage_pb2.UNEXPECTED: UnexpectedError,
storage_pb2.BAD_VERSION: BadVersionError,
storage_pb2.BAD_REVISION: BadRevisionError,
storage_pb2.INVALID_NAMESPACE_NAME: InvalidNamespaceNameError,
storage_pb2.NAMESPACE_EXISTS: NamespaceExistsError,
storage_pb2.NAMESPACE_NOT_FOUND: NamespaceNotFoundError,
storage_pb2.INVALID_STREAM_NAME: InvalidStreamNameError,
storage_pb2.STREAM_EXISTS: StreamExistsError,
storage_pb2.STREAM_NOT_FOUND: StreamNotFoundError,
storage_pb2.INVALID_KEY: InvalidKeyError,
storage_pb2.KEY_EXISTS: KeyExistsError,
storage_pb2.KEY_NOT_FOUND: KeyNotFoundError
}
def exception_class_for_grpc_status(status_code):
"""Return the exception class for a specific :class:`grpc.StatusCode`.
Args:
status_code (grpc.StatusCode): The gRPC status code.
Returns:
:func:`type`: the appropriate subclass of :class:`BKGrpcCallError`.
"""
return _GRPC_CODE_TO_EXCEPTION.get(status_code, BKGrpcCallError)
def from_grpc_status(status_code, message, **kwargs):
"""Create a :class:`BKGrpcCallError` from a :class:`grpc.StatusCode`.
Args:
status_code (grpc.StatusCode): The gRPC status code.
message (str): The exception message.
kwargs: Additional arguments passed to the :class:`BKGrpcCallError`
constructor.
Returns:
BKGrpcCallError: An instance of the appropriate subclass of
:class:`BKGrpcCallError`.
"""
error_class = exception_class_for_grpc_status(status_code)
error = error_class(message, **kwargs)
if error.grpc_status_code is None:
error.grpc_status_code = status_code
return error
def from_grpc_error(rpc_exc):
"""Create a :class:`BKGrpcCallError` from a :class:`grpc.RpcError`.
Args:
rpc_exc (grpc.RpcError): The gRPC error.
Returns:
BKGrpcCallError: An instance of the appropriate subclass of
:class:`BKGrpcError`.
"""
if isinstance(rpc_exc, grpc.Call):
return from_grpc_status(
rpc_exc.code(),
rpc_exc.details(),
errors=(rpc_exc,),
response=rpc_exc)
elif isinstance(rpc_exc, grpc.RpcError):
return from_grpc_error(
rpc_exc.code(),
rpc_exc.details(),
errors=(rpc_exc,),
response=rpc_exc
)
else:
return BKGrpcCallError(
str(rpc_exc), errors=(rpc_exc,), response=rpc_exc)
def exception_class_for_bk_status_code(status_code):
return _BK_CODE_TO_EXCEPTION_.get(status_code, BKGrpcCallError)
def from_bk_status(status_code, message, **kwargs):
error_class = exception_class_for_bk_status_code(status_code)
error = error_class(message, **kwargs)
if error.bk_status_code is None:
error.bk_status_code = status_code
return error
def from_table_rpc_response(rpc_resp):
routing_header = rpc_resp.header
status_code = routing_header.code
if storage_pb2.SUCCESS == status_code:
return rpc_resp
else:
raise from_bk_status(
status_code,
"",
errors=(rpc_resp,),
response=rpc_resp)
def from_root_range_rpc_response(rpc_resp):
status_code = rpc_resp.code
if storage_pb2.SUCCESS == status_code:
return rpc_resp
else:
raise from_bk_status(
status_code,
"",
errors=(rpc_resp,),
response=rpc_resp)