sdk/messaging/azservicebus/internal/errors.go (228 lines of code) (raw):
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package internal
import (
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"reflect"
"strings"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported"
"github.com/Azure/go-amqp"
)
type errNonRetriable struct {
Message string
}
func NewErrNonRetriable(message string) error {
return errNonRetriable{Message: message}
}
func (e errNonRetriable) Error() string { return e.Message }
// RecoveryKind dictates what kind of recovery is possible. Used with
// GetRecoveryKind().
type RecoveryKind string
const (
RecoveryKindNone RecoveryKind = ""
RecoveryKindFatal RecoveryKind = "fatal"
RecoveryKindLink RecoveryKind = "link"
RecoveryKindConn RecoveryKind = "connection"
)
func IsFatalSBError(err error) bool {
return GetRecoveryKind(err) == RecoveryKindFatal
}
// TransformError will create a proper error type that users
// can potentially inspect.
// If the error is actionable then it'll be of type exported.Error which
// has a 'Code' field that can be used programatically.
// If it's not actionable or if it's nil it'll just be returned.
func TransformError(err error) error {
if err == nil {
return nil
}
if ourErr := (*exported.Error)(nil); errors.As(err, &ourErr) {
// it's already been wrapped.
return err
}
if isLockLostError(err) {
return exported.NewError(exported.CodeLockLost, err)
}
if isMicrosoftTimeoutError(err) {
// one scenario where this error pops up is if you're waiting for an available
// session and there are none available. It waits, up to one minute, and then
// returns this error.
return exported.NewError(exported.CodeTimeout, err)
}
// there are a few errors that all boil down to "bad creds or unauthorized"
var amqpErr *amqp.Error
if errors.As(err, &amqpErr) && amqpErr.Condition == amqp.ErrCondUnauthorizedAccess {
return exported.NewError(exported.CodeUnauthorizedAccess, err)
}
var rpcErr RPCError
if errors.As(err, &rpcErr) {
switch rpcErr.Resp.Code {
case http.StatusUnauthorized:
return exported.NewError(exported.CodeUnauthorizedAccess, err)
case http.StatusNotFound:
return exported.NewError(exported.CodeNotFound, err)
}
}
rk := GetRecoveryKind(err)
switch rk {
case RecoveryKindLink:
// note that we could give back a more differentiated error code
// here but it's probably best to just give the customer the simplest
// recovery mechanism possible.
return exported.NewError(exported.CodeConnectionLost, err)
case RecoveryKindConn:
return exported.NewError(exported.CodeConnectionLost, err)
default:
// isn't one of our specifically called out cases so we'll just return it.
return err
}
}
func isMicrosoftTimeoutError(err error) bool {
var amqpErr *amqp.Error
if errors.As(err, &amqpErr) && amqpErr.Condition == amqp.ErrCond("com.microsoft:timeout") {
return true
}
return false
}
func IsLinkError(err error) bool {
var de *amqp.LinkError
return errors.As(err, &de)
}
func IsNotAllowedError(err error) bool {
var e *amqp.Error
return errors.As(err, &e) &&
e.Condition == amqp.ErrCondNotAllowed
}
func IsCancelError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return true
}
if err.Error() == "context canceled" { // go-amqp is returning this when I cancel
return true
}
return false
}
func IsDrainingError(err error) bool {
// TODO: we should be able to identify these errors programatically
return strings.Contains(err.Error(), "link is currently draining")
}
const errorConditionLockLost = amqp.ErrCond("com.microsoft:message-lock-lost")
var amqpConditionsToRecoveryKind = map[amqp.ErrCond]RecoveryKind{
// no recovery needed, these are temporary errors.
amqp.ErrCond("com.microsoft:server-busy"): RecoveryKindNone,
amqp.ErrCond("com.microsoft:timeout"): RecoveryKindNone,
amqp.ErrCond("com.microsoft:operation-cancelled"): RecoveryKindNone,
// Link recovery needed
amqp.ErrCondDetachForced: RecoveryKindLink, // "amqp:link:detach-forced"
amqp.ErrCondTransferLimitExceeded: RecoveryKindLink, // "amqp:link:transfer-limit-exceeded"
// Connection recovery needed
amqp.ErrCondConnectionForced: RecoveryKindConn, // "amqp:connection:forced"
amqp.ErrCondInternalError: RecoveryKindConn, // "amqp:internal-error"
// No recovery possible - this operation is non retriable.
// ErrCondResourceLimitExceeded comes back if the entity is actually full.
amqp.ErrCondResourceLimitExceeded: RecoveryKindFatal, // "amqp:resource-limit-exceeded"
amqp.ErrCondMessageSizeExceeded: RecoveryKindFatal, // "amqp:link:message-size-exceeded"
amqp.ErrCondUnauthorizedAccess: RecoveryKindFatal, // creds are bad
amqp.ErrCondNotFound: RecoveryKindFatal, // "amqp:not-found"
amqp.ErrCondNotAllowed: RecoveryKindFatal, // "amqp:not-allowed"
amqp.ErrCond("com.microsoft:entity-disabled"): RecoveryKindFatal, // entity is disabled in the portal
amqp.ErrCond("com.microsoft:session-cannot-be-locked"): RecoveryKindFatal,
errorConditionLockLost: RecoveryKindFatal,
}
// GetRecoveryKindForSession determines the recovery type for session-based links.
func GetRecoveryKindForSession(err error) RecoveryKind {
// when a session is detached there's a delay before we can reacquire the
// lock. So a lock lost on a session _is_ retryable.
if isLockLostError(err) {
return RecoveryKindLink
}
return GetRecoveryKind(err)
}
// GetRecoveryKind determines the recovery type for non-session based links.
func GetRecoveryKind(err error) RecoveryKind {
if err == nil {
return RecoveryKindNone
}
if IsCancelError(err) {
return RecoveryKindFatal
}
if errors.Is(err, amqpwrap.ErrConnResetNeeded) {
return RecoveryKindConn
}
var netErr net.Error
// these are errors that can flow from the go-amqp connection to
// us. There's work underway to improve this but for now we can handle
// these as "catastrophic" errors and reset everything.
if errors.Is(err, io.EOF) || errors.As(err, &netErr) {
return RecoveryKindConn
}
var errNonRetriable errNonRetriable
if errors.As(err, &errNonRetriable) {
return RecoveryKindFatal
}
// azidentity returns errors that match this for auth failures.
var errNonRetriableMarker interface {
NonRetriable()
error
}
if errors.As(err, &errNonRetriableMarker) {
return RecoveryKindFatal
}
// check the AMQP condition first since it's usually more specific than just knowing it came
// from a link, or a connection.
if amqpError := (*amqp.Error)(nil); errors.As(err, &amqpError) {
recoveryKind, ok := amqpConditionsToRecoveryKind[amqpError.Condition]
if ok {
return recoveryKind
}
}
// fall back to just checking where the error was delivered (ie, LinkError, ConnError, SessionError) - in most cases that should give
// us an idea of how localized the failure was.
if linkErr := (*amqp.LinkError)(nil); errors.As(err, &linkErr) {
return RecoveryKindLink
}
var connErr *amqp.ConnError
var sessionErr *amqp.SessionError
if errors.As(err, &connErr) ||
// session closures appear to leak through when the connection itself is going down.
errors.As(err, &sessionErr) {
return RecoveryKindConn
}
if IsDrainingError(err) {
// temporary, operation should just be retryable since drain will
// eventually complete.
return RecoveryKindNone
}
var rpcErr RPCError
if errors.As(err, &rpcErr) {
// Described more here:
// https://www.oasis-open.org/committees/download.php/54441/AMQP%20Management%20v1.0%20WD09
// > Unsuccessful operations MUST NOT result in a statusCode in the 2xx range as defined in Section 10.2 of [RFC2616]
// RFC2616 is the specification for HTTP.
code := rpcErr.RPCCode()
if code == http.StatusNotFound ||
code == RPCResponseCodeLockLost ||
code == http.StatusUnauthorized {
return RecoveryKindFatal
}
// simple timeouts
if rpcErr.Resp.Code == http.StatusRequestTimeout || rpcErr.Resp.Code == http.StatusServiceUnavailable ||
// internal server errors are worth retrying (they will typically lead
// to a more actionable error). A simple example of this is when you're
// in the middle of an operation and the link is detached. Sometimes you'll get
// the detached event immediately, but sometimes you'll get an intermediate 500
// indicating your original operation was cancelled.
rpcErr.Resp.Code == http.StatusInternalServerError {
return RecoveryKindNone
}
}
// this is some error type we've never seen - recover the entire connection.
return RecoveryKindConn
}
type (
// ErrMissingField indicates that an expected property was missing from an AMQP message. This should only be
// encountered when there is an error with this library, or the server has altered its behavior unexpectedly.
ErrMissingField string
// ErrMalformedMessage indicates that a message was expected in the form of []byte was not a []byte. This is likely
// a bug and should be reported.
ErrMalformedMessage string
// ErrIncorrectType indicates that type assertion failed. This should only be encountered when there is an error
// with this library, or the server has altered its behavior unexpectedly.
ErrIncorrectType struct {
Key string
ExpectedType reflect.Type
ActualValue any
}
// ErrAMQP indicates that the server communicated an AMQP error with a particular
ErrAMQP amqpwrap.RPCResponse
// ErrNoMessages is returned when an operation returned no messages. It is not indicative that there will not be
// more messages in the future.
ErrNoMessages struct{}
)
func (e ErrMissingField) Error() string {
return fmt.Sprintf("missing value %q", string(e))
}
func (e ErrMalformedMessage) Error() string {
return "message was expected in the form of []byte was not a []byte"
}
// NewErrIncorrectType lets you skip using the `reflect` package. Just provide a variable of the desired type as
// 'expected'.
func NewErrIncorrectType(key string, expected, actual any) ErrIncorrectType {
return ErrIncorrectType{
Key: key,
ExpectedType: reflect.TypeOf(expected),
ActualValue: actual,
}
}
func (e ErrIncorrectType) Error() string {
return fmt.Sprintf(
"value at %q was expected to be of type %q but was actually of type %q",
e.Key,
e.ExpectedType,
reflect.TypeOf(e.ActualValue))
}
func (e ErrAMQP) Error() string {
return fmt.Sprintf("server says (%d) %s", e.Code, e.Description)
}
func (e ErrNoMessages) Error() string {
return "no messages available"
}
func isLockLostError(err error) bool {
var rpcErr RPCError
// this is the error you get if you settle on the management$ link
// with an expired locktoken.
if errors.As(err, &rpcErr) && rpcErr.Resp.Code == RPCResponseCodeLockLost {
return true
}
var amqpErr *amqp.Error
// this is the error you get if you settle on the actual receiver link you
// got the message on with an expired locktoken.
if errors.As(err, &amqpErr) && amqpErr.Condition == errorConditionLockLost {
return true
}
return false
}