v2/settlehandler.go (120 lines of code) (raw):
package shuttle
import (
"context"
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/devigned/tab"
)
const settlementTimeout = 30 * time.Second
// Settlement represents an action to take on a message. Abandon, Complete, DeadLetter, Defer, NoOp
type Settlement interface {
Settle(context.Context, MessageSettler, *azservicebus.ReceivedMessage)
}
// Abandon settlement will cause a message to be available again from the queue or subscription.
// This will increment its delivery count, and potentially cause it to be dead-lettered
// depending on your queue or subscription's configuration.
type Abandon struct {
options *azservicebus.AbandonMessageOptions
}
func (a *Abandon) Settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage) {
abandonSettlement.settle(ctx, settler, message, a.options)
}
// Complete settlement completes a message, deleting it from the queue or subscription.
type Complete struct {
options *azservicebus.CompleteMessageOptions
}
func (a *Complete) Settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage) {
completeSettlement.settle(ctx, settler, message, a.options)
}
// DeadLetter settlement moves the message to the dead letter queue for a
// queue or subscription. To process deadlettered messages, create a receiver with `Client.NewReceiverForQueue()`
// or `Client.NewReceiverForSubscription()` using the `ReceiverOptions.SubQueue` option.
type DeadLetter struct {
options *azservicebus.DeadLetterOptions
}
func (a *DeadLetter) Settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage) {
deadLetterSettlement.settle(ctx, settler, message, a.options)
}
// Defer settlement will cause a message to be deferred. Deferred messages are moved to ta deferred queue. They can only be received using
// `Receiver.ReceiveDeferredMessages`.
type Defer struct {
options *azservicebus.DeferMessageOptions
}
func (a *Defer) Settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage) {
deferSettlement.settle(ctx, settler, message, a.options)
}
// NoOp settlement exits the handler without taking an action, letting the message's peek lock expire before incrementing the
// delivery count, or moving it to the deadletter, depending on the queue or subscription's configuration
type NoOp struct {
}
func (a *NoOp) Settle(ctx context.Context, _ MessageSettler, message *azservicebus.ReceivedMessage) {
span := tab.FromContext(ctx)
getLogger(ctx).Warn(fmt.Sprintf("no op settlement. message lock is locked until: %s", message.LockedUntil))
span.Logger().Info("no op settlement. message lock is not released")
}
type Settler func(ctx context.Context, message *azservicebus.ReceivedMessage) Settlement
func (s Settler) Handle(ctx context.Context, message *azservicebus.ReceivedMessage) Settlement {
return s(ctx, message)
}
// SettlementHandlerOptions allows to configure the SettleHandler
type SettlementHandlerOptions struct {
// OnNilSettlement is a func that allows to handle cases where the downstream handler returns nil.
// the default behavior is to panic.
OnNilSettlement func() Settlement
}
// NewSettlementHandler creates a middleware to use the Settlement api in the message handler implementation.
func NewSettlementHandler(opts *SettlementHandlerOptions, handler Settler) HandlerFunc {
options := &SettlementHandlerOptions{
OnNilSettlement: func() Settlement {
panic("you must return a settlement from the message handler " +
"or override the OnNilSettlement option to handle nil Settlement")
},
}
if opts != nil {
options.OnNilSettlement = opts.OnNilSettlement
}
return func(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage) {
s := handler(ctx, message)
if s == nil {
// this panics unless the user overrides the OnNilSettlement option
s = options.OnNilSettlement()
}
s.Settle(ctx, settler, message)
}
}
type settlement[T any] struct {
settleFunc func(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage, options T) error
name string
}
func (s settlement[T]) settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage, options T) {
span := tab.FromContext(ctx)
span.Logger().Info(fmt.Sprintf("%s message", s.name))
settleCtx, cancel := context.WithTimeout(ctx, settlementTimeout)
defer cancel()
getLogger(ctx).Info(fmt.Sprintf("%s message with ID: %s", s.name, message.MessageID))
if err := s.settleFunc(settleCtx, settler, message, options); err != nil {
wrapped := fmt.Errorf("%s settlement failed: %w", s.name, err)
getLogger(ctx).Error(wrapped.Error())
span.Logger().Error(wrapped)
// the processing will terminate and the lock on the message will eventually be released after
// the message lock expires on the broker side
}
}
var abandonSettlement = settlement[*azservicebus.AbandonMessageOptions]{
name: "abandon",
settleFunc: func(ctx context.Context,
settler MessageSettler,
message *azservicebus.ReceivedMessage,
options *azservicebus.AbandonMessageOptions) error {
return settler.AbandonMessage(ctx, message, options)
},
}
var completeSettlement = settlement[*azservicebus.CompleteMessageOptions]{
name: "complete",
settleFunc: func(ctx context.Context,
settler MessageSettler,
message *azservicebus.ReceivedMessage,
options *azservicebus.CompleteMessageOptions) error {
return settler.CompleteMessage(ctx, message, options)
},
}
var deferSettlement = settlement[*azservicebus.DeferMessageOptions]{
name: "defer",
settleFunc: func(ctx context.Context,
settler MessageSettler,
message *azservicebus.ReceivedMessage,
options *azservicebus.DeferMessageOptions) error {
return settler.DeferMessage(ctx, message, options)
},
}
var deadLetterSettlement = settlement[*azservicebus.DeadLetterOptions]{
name: "dead letter",
settleFunc: func(ctx context.Context,
settler MessageSettler,
message *azservicebus.ReceivedMessage,
options *azservicebus.DeadLetterOptions) error {
return settler.DeadLetterMessage(ctx, message, options)
},
}