sdk/messaging/azservicebus/receiver.go (400 lines of code) (raw):
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package azservicebus
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/utils"
"github.com/Azure/go-amqp"
)
// ReceiveMode represents the lock style to use for a receiver - either
// `PeekLock` or `ReceiveAndDelete`
type ReceiveMode = exported.ReceiveMode
const (
// ReceiveModePeekLock will lock messages as they are received and can be settled
// using the Receiver's (Complete|Abandon|DeadLetter|Defer)Message
// functions.
ReceiveModePeekLock ReceiveMode = exported.PeekLock
// ReceiveModeReceiveAndDelete will delete messages as they are received.
ReceiveModeReceiveAndDelete ReceiveMode = exported.ReceiveAndDelete
)
// SubQueue allows you to target a subqueue of a queue or subscription.
// Ex: the dead letter queue (SubQueueDeadLetter).
type SubQueue int
const (
// SubQueueDeadLetter targets the dead letter queue for a queue or subscription.
SubQueueDeadLetter SubQueue = 1
// SubQueueTransfer targets the transfer dead letter queue for a queue or subscription.
SubQueueTransfer SubQueue = 2
)
// Receiver receives messages using pull based functions (ReceiveMessages).
type Receiver struct {
amqpLinks internal.AMQPLinks
cancelReleaser *atomic.Value
cleanupOnClose func()
entityPath string
lastPeekedSequenceNumber int64
maxAllowedCredits uint32
mu sync.Mutex
receiveMode ReceiveMode
receiving bool
retryOptions RetryOptions
settler *messageSettler
defaultReleaserTimeout time.Duration // defaults to 1min, settable for unit tests.
}
// ReceiverOptions contains options for the `Client.NewReceiverForQueue` or `Client.NewReceiverForSubscription`
// functions.
type ReceiverOptions struct {
// ReceiveMode controls when a message is deleted from Service Bus.
//
// ReceiveModePeekLock is the default. The message is locked, preventing multiple
// receivers from processing the message at once. You control the lock state of the message
// using one of the message settlement functions like Receiver.CompleteMessage(), which removes
// it from Service Bus, or Receiver.AbandonMessage(), which makes it available again.
//
// ReceiveModeReceiveAndDelete causes Service Bus to remove the message as soon
// as it's received.
//
// More information about receive modes:
// https://docs.microsoft.com/azure/service-bus-messaging/message-transfers-locks-settlement#settling-receive-operations
ReceiveMode ReceiveMode
// SubQueue should be set to connect to the sub queue (ex: dead letter queue)
// of the queue or subscription.
SubQueue SubQueue
}
// defaultLinkRxBuffer is the maximum number of transfer frames we can handle
// on the Receiver. This matches the current default window size that go-amqp
// uses for sessions.
const defaultLinkRxBuffer uint32 = 5000
func applyReceiverOptions(receiver *Receiver, entity *entity, options *ReceiverOptions) error {
if options == nil {
receiver.receiveMode = ReceiveModePeekLock
} else {
if err := checkReceiverMode(options.ReceiveMode); err != nil {
return err
}
receiver.receiveMode = options.ReceiveMode
if err := entity.SetSubQueue(options.SubQueue); err != nil {
return err
}
}
entityPath, err := entity.String()
if err != nil {
return err
}
receiver.entityPath = entityPath
return nil
}
type newReceiverArgs struct {
ns internal.NamespaceForAMQPLinks
entity entity
cleanupOnClose func()
getRecoveryKindFunc func(err error) internal.RecoveryKind
newLinkFn func(ctx context.Context, session amqpwrap.AMQPSession) (amqpwrap.AMQPSenderCloser, amqpwrap.AMQPReceiverCloser, error)
retryOptions RetryOptions
}
var emptyCancelFn = func() string {
return "empty"
}
func newReceiver(args newReceiverArgs, options *ReceiverOptions) (*Receiver, error) {
if err := args.ns.Check(); err != nil {
return nil, err
}
receiver := &Receiver{
cancelReleaser: &atomic.Value{},
cleanupOnClose: args.cleanupOnClose,
lastPeekedSequenceNumber: 0,
maxAllowedCredits: defaultLinkRxBuffer,
retryOptions: args.retryOptions,
defaultReleaserTimeout: time.Minute,
}
receiver.cancelReleaser.Store(emptyCancelFn)
if err := applyReceiverOptions(receiver, &args.entity, options); err != nil {
return nil, err
}
newLinkFn := receiver.newReceiverLink
if args.newLinkFn != nil {
newLinkFn = args.newLinkFn
}
receiver.amqpLinks = internal.NewAMQPLinks(internal.NewAMQPLinksArgs{
NS: args.ns,
EntityPath: receiver.entityPath,
CreateLinkFunc: newLinkFn,
GetRecoveryKindFunc: args.getRecoveryKindFunc,
})
// 'nil' settler handles returning an error message for receiveAndDelete links.
if receiver.receiveMode == ReceiveModePeekLock {
receiver.settler = newMessageSettler(receiver.amqpLinks, receiver.retryOptions)
} else {
receiver.settler = (*messageSettler)(nil)
}
return receiver, nil
}
func (r *Receiver) newReceiverLink(ctx context.Context, session amqpwrap.AMQPSession) (amqpwrap.AMQPSenderCloser, amqpwrap.AMQPReceiverCloser, error) {
linkOptions := createLinkOptions(r.receiveMode)
link, err := session.NewReceiver(ctx, r.entityPath, linkOptions)
return nil, link, err
}
// ReceiveMessagesOptions are options for the ReceiveMessages function.
type ReceiveMessagesOptions struct {
// TimeAfterFirstMessage controls how long, after a message has been received, before we return the
// accumulated batch of messages.
//
// Default value depends on the receive mode:
// - 20ms when the receiver is in ReceiveModePeekLock
// - 1s when the receiver is in ReceiveModeReceiveAndDelete
TimeAfterFirstMessage time.Duration
}
// ReceiveMessages receives a fixed number of messages, up to numMessages.
// This function will block until at least one message is received or until the ctx is cancelled.
// If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable.
func (r *Receiver) ReceiveMessages(ctx context.Context, maxMessages int, options *ReceiveMessagesOptions) ([]*ReceivedMessage, error) {
r.mu.Lock()
isReceiving := r.receiving
if !isReceiving {
r.receiving = true
defer func() {
r.mu.Lock()
r.receiving = false
r.mu.Unlock()
}()
}
r.mu.Unlock()
if isReceiving {
return nil, errors.New("receiver is already receiving messages. ReceiveMessages() cannot be called concurrently")
}
messages, err := r.receiveMessagesImpl(ctx, maxMessages, options)
return messages, internal.TransformError(err)
}
// ReceiveDeferredMessagesOptions contains optional parameters for the ReceiveDeferredMessages function.
type ReceiveDeferredMessagesOptions struct {
// For future expansion
}
// ReceiveDeferredMessages receives messages that were deferred using `Receiver.DeferMessage`.
// If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable.
func (r *Receiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers []int64, options *ReceiveDeferredMessagesOptions) ([]*ReceivedMessage, error) {
var receivedMessages []*ReceivedMessage
err := r.amqpLinks.Retry(ctx, EventReceiver, "receiveDeferredMessages", func(ctx context.Context, lwid *internal.LinksWithID, args *utils.RetryFnArgs) error {
amqpMessages, err := internal.ReceiveDeferred(ctx, lwid.RPC, lwid.Receiver.LinkName(), r.receiveMode, sequenceNumbers)
if err != nil {
return err
}
for _, amqpMsg := range amqpMessages {
receivedMsg := newReceivedMessage(amqpMsg, lwid.Receiver)
receivedMsg.settleOnMgmtLink = true
receivedMessages = append(receivedMessages, receivedMsg)
}
return nil
}, r.retryOptions)
return receivedMessages, internal.TransformError(err)
}
// PeekMessagesOptions contains options for the `Receiver.PeekMessages`
// function.
type PeekMessagesOptions struct {
// FromSequenceNumber is the sequence number to start with when peeking messages.
FromSequenceNumber *int64
}
// PeekMessages will peek messages without locking or deleting messages.
//
// The Receiver stores the last peeked sequence number internally, and will use it as the
// start location for the next PeekMessages() call. You can override this behavior by passing an
// explicit sequence number in [azservicebus.PeekMessagesOptions.FromSequenceNumber].
//
// Messages that are peeked are not locked, so settlement methods like [Receiver.CompleteMessage],
// [Receiver.AbandonMessage], [Receiver.DeferMessage] or [Receiver.DeadLetterMessage] will not work with them.
//
// If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable.
//
// For more information about peeking/message-browsing see https://aka.ms/azsdk/servicebus/message-browsing
func (r *Receiver) PeekMessages(ctx context.Context, maxMessageCount int, options *PeekMessagesOptions) ([]*ReceivedMessage, error) {
var receivedMessages []*ReceivedMessage
err := r.amqpLinks.Retry(ctx, EventReceiver, "peekMessages", func(ctx context.Context, links *internal.LinksWithID, args *utils.RetryFnArgs) error {
var sequenceNumber = r.lastPeekedSequenceNumber + 1
updateInternalSequenceNumber := true
if options != nil && options.FromSequenceNumber != nil {
sequenceNumber = *options.FromSequenceNumber
updateInternalSequenceNumber = false
}
messages, err := internal.PeekMessages(ctx, links.RPC, links.Receiver.LinkName(), sequenceNumber, int32(maxMessageCount))
if err != nil {
return err
}
receivedMessages = make([]*ReceivedMessage, len(messages))
for i := 0; i < len(messages); i++ {
receivedMessages[i] = newReceivedMessage(messages[i], links.Receiver)
}
if len(receivedMessages) > 0 && updateInternalSequenceNumber {
// only update this if they're doing the implicit iteration as part of the receiver.
r.lastPeekedSequenceNumber = *receivedMessages[len(receivedMessages)-1].SequenceNumber
}
return nil
}, r.retryOptions)
return receivedMessages, internal.TransformError(err)
}
// RenewMessageLockOptions contains optional parameters for the RenewMessageLock function.
type RenewMessageLockOptions struct {
// For future expansion
}
// RenewMessageLock renews the lock on a message, updating the `LockedUntil` field on `msg`.
// If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable.
func (r *Receiver) RenewMessageLock(ctx context.Context, msg *ReceivedMessage, options *RenewMessageLockOptions) error {
err := r.amqpLinks.Retry(ctx, EventReceiver, "renewMessageLock", func(ctx context.Context, linksWithVersion *internal.LinksWithID, args *utils.RetryFnArgs) error {
newExpirationTime, err := internal.RenewLocks(ctx, linksWithVersion.RPC, msg.linkName, []amqp.UUID{
(amqp.UUID)(msg.LockToken),
})
if err != nil {
return err
}
msg.LockedUntil = &newExpirationTime[0]
return nil
}, r.retryOptions)
return internal.TransformError(err)
}
// Close permanently closes the receiver.
func (r *Receiver) Close(ctx context.Context) error {
cancelReleaser := r.cancelReleaser.Swap(emptyCancelFn).(func() string)
releaserID := cancelReleaser()
r.amqpLinks.Writef(EventReceiver, "Stopped message releaser with ID '%s'", releaserID)
r.cleanupOnClose()
return r.amqpLinks.Close(ctx, true)
}
// CompleteMessage completes a message, deleting it from the queue or subscription.
// This function can only be used when the Receiver has been opened with ReceiveModePeekLock.
// If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable.
func (r *Receiver) CompleteMessage(ctx context.Context, message *ReceivedMessage, options *CompleteMessageOptions) error {
return r.settler.CompleteMessage(ctx, message, options)
}
// AbandonMessage will cause a message to be available again from the queue or subscription.
// This will increment its delivery count, and potentially cause it to be dead-lettered
// depending on your queue or subscription's configuration.
// This function can only be used when the Receiver has been opened with `ReceiveModePeekLock`.
// If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable.
func (r *Receiver) AbandonMessage(ctx context.Context, message *ReceivedMessage, options *AbandonMessageOptions) error {
return r.settler.AbandonMessage(ctx, message, options)
}
// DeferMessage will cause a message to be deferred. Deferred messages can be received using
// `Receiver.ReceiveDeferredMessages`.
// This function can only be used when the Receiver has been opened with `ReceiveModePeekLock`.
// If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable.
func (r *Receiver) DeferMessage(ctx context.Context, message *ReceivedMessage, options *DeferMessageOptions) error {
return r.settler.DeferMessage(ctx, message, options)
}
// DeadLetterMessage settles a message by moving it to the dead letter queue for a
// queue or subscription. To receive these messages create a receiver with `Client.NewReceiverForQueue()`
// or `Client.NewReceiverForSubscription()` using the `ReceiverOptions.SubQueue` option.
// This function can only be used when the Receiver has been opened with `ReceiveModePeekLock`.
// If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable.
func (r *Receiver) DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) error {
return r.settler.DeadLetterMessage(ctx, message, options)
}
func (r *Receiver) receiveMessagesImpl(ctx context.Context, maxMessages int, options *ReceiveMessagesOptions) ([]*ReceivedMessage, error) {
cancelReleaser := r.cancelReleaser.Swap(emptyCancelFn).(func() string)
_ = cancelReleaser()
if maxMessages <= 0 {
return nil, internal.NewErrNonRetriable("maxMessages should be greater than 0")
}
if maxMessages > int(r.maxAllowedCredits) {
return nil, internal.NewErrNonRetriable(fmt.Sprintf("maxMessages cannot exceed %d", r.maxAllowedCredits))
}
var linksWithID *internal.LinksWithID
err := r.amqpLinks.Retry(ctx, EventReceiver, "receiveMessages.getlinks", func(ctx context.Context, lwid *internal.LinksWithID, args *utils.RetryFnArgs) error {
linksWithID = lwid
return nil
}, r.retryOptions)
if err != nil {
return nil, err
}
// request just the right amount of credits, taking into account the credits
// that are already active on the link from prior ReceiveMessages() calls that
// might have exited before all credits were used up.
currentReceiverCredits := int64(linksWithID.Receiver.Credits())
creditsToIssue := int64(maxMessages) - currentReceiverCredits
if creditsToIssue > 0 {
r.amqpLinks.Writef(EventReceiver, "Issuing %d credits, have %d", creditsToIssue, currentReceiverCredits)
if err := linksWithID.Receiver.IssueCredit(uint32(creditsToIssue)); err != nil {
return nil, err
}
} else {
r.amqpLinks.Writef(EventReceiver, "Have %d credits, no new credits needed", currentReceiverCredits)
}
timeAfterFirstMessage := 20 * time.Millisecond
if options != nil && options.TimeAfterFirstMessage > 0 {
timeAfterFirstMessage = options.TimeAfterFirstMessage
} else if r.receiveMode == ReceiveModeReceiveAndDelete {
timeAfterFirstMessage = time.Second
}
result := r.fetchMessages(ctx, linksWithID.Receiver, maxMessages, timeAfterFirstMessage)
r.amqpLinks.Writef(EventReceiver, "Received %d/%d messages", len(result.Messages), maxMessages)
// this'll only close anything if the error indicates that the link/connection is bad.
// it's safe to call with cancellation errors.
rk := r.amqpLinks.CloseIfNeeded(context.Background(), result.Error)
if rk == internal.RecoveryKindNone {
// The link is still alive - we'll start the releaser which will releasing any messages
// that arrive between this call and the next call to ReceiveMessages().
//
// This prevents us from holding onto messages for a long period of time in our
// internal cache where they'll just keep expiring.
releaserFunc := r.newReleaserFunc(linksWithID.Receiver)
go releaserFunc()
} else {
r.amqpLinks.Writef(EventReceiver, "Failure when receiving messages: %s", result.Error)
}
// If the user does get some messages we ignore 'error' and return only the messages.
//
// Doing otherwise would break the idiom that people are used to where people expected
// a non-nil error to mean any other values in the return are nil or not useful (ie,
// partial success is not idiomatic).
//
// This is mostly safe because the next call to ReceiveMessages() (or any other
// function on Receiver).will have the same issue and will return the relevant error
// at that time
if len(result.Messages) == 0 {
if internal.IsCancelError(result.Error) || rk == internal.RecoveryKindFatal {
return nil, result.Error
}
return nil, nil
}
var receivedMessages []*ReceivedMessage
for _, msg := range result.Messages {
receivedMessages = append(receivedMessages, newReceivedMessage(msg, linksWithID.Receiver))
}
return receivedMessages, nil
}
type entity struct {
subqueue SubQueue
Queue string
Topic string
Subscription string
}
func (e *entity) String() (string, error) {
entityPath := ""
if e.Queue != "" {
entityPath = e.Queue
} else if e.Topic != "" && e.Subscription != "" {
entityPath = fmt.Sprintf("%s/Subscriptions/%s", e.Topic, e.Subscription)
} else {
return "", errors.New("a queue or subscription was not specified")
}
if e.subqueue == SubQueueDeadLetter {
entityPath += "/$DeadLetterQueue"
} else if e.subqueue == SubQueueTransfer {
entityPath += "/$Transfer/$DeadLetterQueue"
}
return entityPath, nil
}
func (e *entity) SetSubQueue(subQueue SubQueue) error {
if subQueue == 0 {
return nil
} else if subQueue == SubQueueDeadLetter || subQueue == SubQueueTransfer {
e.subqueue = subQueue
return nil
}
return fmt.Errorf("unknown SubQueue %d", subQueue)
}
func createLinkOptions(mode ReceiveMode) *amqp.ReceiverOptions {
receiveMode := amqp.ReceiverSettleModeSecond
if mode == ReceiveModeReceiveAndDelete {
receiveMode = amqp.ReceiverSettleModeFirst
}
receiverOpts := &amqp.ReceiverOptions{
SettlementMode: receiveMode.Ptr(),
Credit: -1,
}
if mode == ReceiveModeReceiveAndDelete {
receiverOpts.RequestedSenderSettleMode = amqp.SenderSettleModeSettled.Ptr()
}
return receiverOpts
}
func checkReceiverMode(receiveMode ReceiveMode) error {
if receiveMode == ReceiveModePeekLock || receiveMode == ReceiveModeReceiveAndDelete {
return nil
}
return fmt.Errorf("invalid receive mode %d, must be either azservicebus.PeekLock or azservicebus.ReceiveAndDelete", receiveMode)
}
// fetchMessagesResult is the result from a fetchMessages
// call.
// NOTE: that you can get both an error and messages!
type fetchMessagesResult struct {
Messages []*amqp.Message
Error error
}
// fetchMessages receives messages, blocking indefinitely until at least one
// message arrives, the parentCtx parameter is cancelled, or the receiver itself
// is disconnected from Service Bus.
//
// Note, if you want to only receive prefetched messages send the parentCtx in
// pre-cancelled. This will cause us to only flush the prefetch buffer.
func (r *Receiver) fetchMessages(parentCtx context.Context, receiver amqpwrap.AMQPReceiver, count int, timeAfterFirstMessage time.Duration) fetchMessagesResult {
// The first receive is a bit special - we activate a short timer after this
// so the user doesn't end up in a situation where we're holding onto a bunch
// of messages but never return because they never cancelled and we never
// received all 'count' number of messages.
firstMsg, err := receiver.Receive(parentCtx, nil)
if err != nil {
// drain the prefetch buffer - we're stopping because of a
// failure on the link/connection _or_ the user cancelled the
// operation.
return fetchMessagesResult{
Error: err,
// Since our link is always active it's possible some
// messages were sitting in the prefetched buffer from before
//
// This particularly affects us in ReceiveAndDelete mode since the
// local copy of the message can never be retrieved from the server
// again (they're pre-settled).
Messages: getAllPrefetched(receiver, count),
}
}
messages := []*amqp.Message{firstMsg}
// after we get one message we will try to receive as much as we can
// during the `timeAfterFirstMessage` duration.
ctx, cancel := context.WithTimeout(parentCtx, timeAfterFirstMessage)
defer cancel()
var lastErr error
for i := 0; i < count-1; i++ {
msg, err := receiver.Receive(ctx, nil)
if err != nil {
lastErr = err
break
}
messages = append(messages, msg)
}
// drain the prefetch buffer - we're stopping because of a
// failure on the link/connection _or_ the user cancelled the
// operation.
messages = append(messages, getAllPrefetched(receiver, count-len(messages))...)
if internal.IsCancelError(lastErr) {
return fetchMessagesResult{
Messages: messages,
// we might have cancelled here (because we stop receiving after `timeAfterFirstMessage` expires)
// or _they_ cancelled the ReceiveMessages() call.
//
// If we cancel: we want a nil error since there's no failure. In that case parentCtx.Err() is nil
// If they cancel: we want to forward on their cancellation error.
Error: parentCtx.Err(),
}
} else {
return fetchMessagesResult{
Error: lastErr,
Messages: messages,
}
}
}
// newReleaserFunc creates a function that continually receives on a
// receiver and amqpReceiver.Release(msg)'s until cancelled. We use this
// lieu of a 'drain' strategy so we don't hold onto messages in our internal
// cache only for them to expire.
func (r *Receiver) newReleaserFunc(receiver amqpwrap.AMQPReceiver) func() {
if r.receiveMode == ReceiveModeReceiveAndDelete {
// you can't disposition messages that are received in this mode - these messages
// are "presettled" so we do NOT want to discard these messages.
return func() {}
}
ctx, cancel := context.WithCancel(context.Background())
releaseLoopDone := make(chan struct{})
released := 0
// this func gets called when a new ReceiveMessages() starts
r.cancelReleaser.Store(func() string {
cancel()
<-releaseLoopDone
return receiver.LinkName()
})
return func() {
defer close(releaseLoopDone)
for {
msg, err := receiver.Receive(ctx, nil)
if err == nil {
releaseCtx, cancelRelease := context.WithTimeout(context.Background(), r.defaultReleaserTimeout)
// We don't use `ctx` here to avoid cancelling Release(), and leaving this message
// in limbo until it expires.
err = receiver.ReleaseMessage(releaseCtx, msg)
cancelRelease()
if err == nil {
released++
}
}
// We check `ctx.Err()` here, instead of testing the returned err from .Receive(), because Receive()
// ignores cancellation if it has any messages in its prefetch queue.
if ctx.Err() != nil {
if released > 0 {
r.amqpLinks.Writef(exported.EventReceiver, "Message releaser pausing. Released %d messages", released)
}
break
} else if internal.GetRecoveryKind(err) != internal.RecoveryKindNone {
r.amqpLinks.Writef(exported.EventReceiver, "Message releaser stopping because of link failure. Released %d messages. Will start again after next receive: %s", released, err)
break
}
}
}
}
func getAllPrefetched(receiver amqpwrap.AMQPReceiver, max int) []*amqp.Message {
var messages []*amqp.Message
for i := 0; i < max; i++ {
msg := receiver.Prefetched()
if msg == nil {
break
}
messages = append(messages, msg)
}
return messages
}