v2/managedsettling.go (122 lines of code) (raw):

package shuttle import ( "context" "fmt" "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" ) // ManagedSettlingFunc allows to convert a function with the signature // func(context.Context, *azservicebus.ReceivedMessage) error // to the ManagedSettlingHandler interface. type ManagedSettlingFunc func(ctx context.Context, message *azservicebus.ReceivedMessage) error func (f ManagedSettlingFunc) Handle(ctx context.Context, message *azservicebus.ReceivedMessage) error { return f(ctx, message) } // ManagedSettlingHandler is the message Handler interface for the ManagedSettler. type ManagedSettlingHandler interface { Handle(context.Context, *azservicebus.ReceivedMessage) error } var _ Handler = (*ManagedSettler)(nil) // ManagedSettler is a middleware that allows to reduce the message handler signature to ManagedSettlingFunc type ManagedSettler struct { next ManagedSettlingHandler options *ManagedSettlingOptions } func (m *ManagedSettler) Handle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage) { logger := getLogger(ctx) if err := m.next.Handle(ctx, message); err != nil { logger.Error(fmt.Sprintf("error returned from the handler. Calling ManagedSettler error handler. error: %s", err)) m.options.OnError(ctx, m.options, settler, message, err) return } settleCtx, cancel := context.WithTimeout(ctx, settlementTimeout) defer cancel() if err := settler.CompleteMessage(settleCtx, message, nil); err != nil { logger.Error(fmt.Sprintf("error completing message: %s", err)) m.options.OnAbandoned(ctx, message, err) return // if we fail to complete the message, we log the error and let the message lock expire. // we cannot do more at this point. } m.options.OnCompleted(ctx, message) } // RetryDecision allows to provide custom retry decision. type RetryDecision interface { // CanRetry inspects the error returned from the message handler, and the message itself to decide if it should be retried or not. CanRetry(err error, message *azservicebus.ReceivedMessage) bool } // MaxAttemptsRetryDecision defines how many delivery the handler allows before explicitly moving the message to the deadletter queue. // This requires the MaxDeliveryCount from the queue or subscription to be higher than the MaxAttempts property. // If the queue or subscription's MaxDeliveryCount is lower than MaxAttempts, // service bus will move the message to the DLQ before the handler reaches the MaxAttempts. type MaxAttemptsRetryDecision struct { MaxAttempts uint32 } func (d *MaxAttemptsRetryDecision) CanRetry(_ error, message *azservicebus.ReceivedMessage) bool { return message.DeliveryCount < d.MaxAttempts } // ManagedSettlingOptions allows to configure the ManagedSettling middleware type ManagedSettlingOptions struct { // Allows to override the built-in error handling logic. // OnError is called before any message settling action is taken. // the ManagedSettlingOptions struct is passed as an argument so that the configuration // like RetryDecision, RetryDelayStrategy and the post-settlement hooks can be reused and composed differently OnError func(ctx context.Context, opts *ManagedSettlingOptions, settler MessageSettler, message *azservicebus.ReceivedMessage, handleErr error) // RetryDecision is invoked to decide whether an error should be retried. // the default is to retry 5 times before moving the message to the deadletter. RetryDecision RetryDecision // RetryDelayStrategy is invoked when a message handling does not complete successfully // and the RetryDecision decides to retry the message. // The handler will sleep for the time calculated by the delayStrategy before Abandoning the message. RetryDelayStrategy RetryDelayStrategy // OnAbandoned is invoked when the handler returns an error. It is invoked after the message is abandoned. OnAbandoned func(context.Context, *azservicebus.ReceivedMessage, error) // OnDeadLettered is invoked after the ManagedSettling dead-letters a message. // this occurs when the RetryDecision.CanRetry implementation returns false following an error returned by the handler // It is invoked after the message is dead-lettered. OnDeadLettered func(context.Context, *azservicebus.ReceivedMessage, error) // OnCompleted is a func that is invoked when the handler does not return any error. it is invoked after the message is completed. OnCompleted func(context.Context, *azservicebus.ReceivedMessage) } // NewManagedSettlingHandler allows to configure Retry decision logic and delay strategy. // It also adapts the handler to let the user return an error from the handler, instead of a settlement. // the settlement is inferred from the handler's return value. // error -> abandon // nil -> complete // the RetryDecision can be overridden and can inspect the error returned to decide to retry the message or not. // this allows to define error types that shouldn't be retried (and moved directly to the deadletter queue) func NewManagedSettlingHandler(opts *ManagedSettlingOptions, handler ManagedSettlingHandler) *ManagedSettler { options := defaultManagedSettlingOptions() if opts != nil { if opts.OnError != nil { options.OnError = opts.OnError } if opts.RetryDecision != nil { options.RetryDecision = opts.RetryDecision } if opts.RetryDelayStrategy != nil { options.RetryDelayStrategy = opts.RetryDelayStrategy } if opts.OnCompleted != nil { options.OnCompleted = opts.OnCompleted } if opts.OnAbandoned != nil { options.OnAbandoned = opts.OnAbandoned } if opts.OnDeadLettered != nil { options.OnDeadLettered = opts.OnDeadLettered } } return &ManagedSettler{ next: handler, options: options, } } func defaultManagedSettlingOptions() *ManagedSettlingOptions { const ( defaultRetryDecisionMaxAttempts = 5 defaultDelay = 5 * time.Second ) return &ManagedSettlingOptions{ OnError: handleError, RetryDecision: &MaxAttemptsRetryDecision{MaxAttempts: defaultRetryDecisionMaxAttempts}, RetryDelayStrategy: &ConstantDelayStrategy{Delay: defaultDelay}, OnCompleted: func(_ context.Context, _ *azservicebus.ReceivedMessage) { }, OnAbandoned: func(_ context.Context, _ *azservicebus.ReceivedMessage, _ error) { }, OnDeadLettered: func(_ context.Context, _ *azservicebus.ReceivedMessage, _ error) { }, } } func handleError(ctx context.Context, options *ManagedSettlingOptions, settler MessageSettler, message *azservicebus.ReceivedMessage, handleErr error) { if handleErr == nil { handleErr = fmt.Errorf("nil error: %w", handleErr) } logger := getLogger(ctx) if !options.RetryDecision.CanRetry(handleErr, message) { logger.Error(fmt.Sprintf("moving message to dead letter queue because processing failed to an error: %s", handleErr)) deadLetterSettlement.settle(ctx, settler, message, &azservicebus.DeadLetterOptions{ Reason: to.Ptr("ManagedSettlingHandlerDeadLettering"), ErrorDescription: to.Ptr(handleErr.Error()), PropertiesToModify: nil, }) // this could be a special hook to have more control on deadlettering, but keeping it simple for now options.OnDeadLettered(ctx, message, handleErr) return } // the delay is implemented as an in-memory sleep before calling abandon. // this will continue renewing the lock on the message while we wait for this delay to pass. delay := options.RetryDelayStrategy.GetDelay(message.DeliveryCount) logger.Error(fmt.Sprintf("delay strategy return delay of %s", delay)) time.Sleep(delay) abandonSettlement.settle(ctx, settler, message, nil) options.OnAbandoned(ctx, message, handleErr) }