sdk/messaging/azservicebus/session_receiver.go (162 lines of code) (raw):

// Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. package azservicebus import ( "context" "fmt" "time" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/utils" "github.com/Azure/go-amqp" ) // SessionReceiver is a Receiver that handles sessions. type SessionReceiver struct { inner *Receiver sessionID *string acceptNextTimeout time.Duration lockedUntil time.Time } // SessionReceiverOptions contains options for the `Client.AcceptSessionForQueue/Subscription` or `Client.AcceptNextSessionForQueue/Subscription` // functions. type SessionReceiverOptions struct { // ReceiveMode controls when a message is deleted from Service Bus. // // ReceiveModePeekLock is the default. The message is locked, preventing multiple // receivers from processing the message at once. You control the lock state of the message // using one of the message settlement functions like SessionReceiver.CompleteMessage(), which removes // it from Service Bus, or SessionReceiver.AbandonMessage(), which makes it available again. // // ReceiveModeReceiveAndDelete causes Service Bus to remove the message as soon // as it's received. // // More information about receive modes: // https://docs.microsoft.com/azure/service-bus-messaging/message-transfers-locks-settlement#settling-receive-operations ReceiveMode ReceiveMode } func toReceiverOptions(sropts *SessionReceiverOptions) *ReceiverOptions { if sropts == nil { return nil } return &ReceiverOptions{ ReceiveMode: sropts.ReceiveMode, } } type newSessionReceiverArgs struct { sessionID *string ns internal.NamespaceForAMQPLinks entity entity cleanupOnClose func() retryOptions RetryOptions acceptNextTimeout time.Duration } func newSessionReceiver(ctx context.Context, args newSessionReceiverArgs, options *ReceiverOptions) (*SessionReceiver, error) { sessionReceiver := &SessionReceiver{ sessionID: args.sessionID, lockedUntil: time.Time{}, } r, err := newReceiver(newReceiverArgs{ ns: args.ns, entity: args.entity, cleanupOnClose: args.cleanupOnClose, newLinkFn: sessionReceiver.newLink, getRecoveryKindFunc: internal.GetRecoveryKindForSession, retryOptions: args.retryOptions, }, options) if err != nil { return nil, err } sessionReceiver.acceptNextTimeout = args.acceptNextTimeout sessionReceiver.inner = r return sessionReceiver, nil } func (r *SessionReceiver) newLink(ctx context.Context, session amqpwrap.AMQPSession) (amqpwrap.AMQPSenderCloser, amqpwrap.AMQPReceiverCloser, error) { const sessionFilterName = "com.microsoft:session-filter" const code = uint64(0x00000137000000C) linkOptions := createLinkOptions(r.inner.receiveMode) if r.sessionID == nil { linkOptions.Filters = append(linkOptions.Filters, amqp.NewLinkFilter(sessionFilterName, code, nil)) } else { linkOptions.Filters = append(linkOptions.Filters, amqp.NewLinkFilter(sessionFilterName, code, r.sessionID)) } if r.acceptNextTimeout > 0 { if linkOptions.Properties == nil { linkOptions.Properties = map[string]any{} } // the remote side of this seems _very_ picky that the type not be larger than 32-bits. timeoutInMS := uint32(r.acceptNextTimeout / time.Millisecond) linkOptions.Properties["com.microsoft:timeout"] = timeoutInMS } link, err := session.NewReceiver(ctx, r.inner.amqpLinks.EntityPath(), linkOptions) if err != nil { return nil, nil, err } // check the session ID that came back - if we asked for a named session ID and didn't get it then // we failed to get the lock. // if we specified nil then we can _set_ our internally held session ID now that we know the value. receivedSessionID := link.LinkSourceFilterValue(sessionFilterName) receivedSessionIDStr, ok := receivedSessionID.(string) if !ok || (r.sessionID != nil && receivedSessionIDStr != *r.sessionID) { return nil, nil, fmt.Errorf("invalid type/value for returned sessionID(type:%T, value:%v)", receivedSessionID, receivedSessionID) } r.sessionID = &receivedSessionIDStr if props := link.Properties(); props != nil { if lockTimeTicks, hasLockTime := props["com.microsoft:locked-until-utc"].(int64); hasLockTime { r.lockedUntil = ticksToUnixTime(lockTimeTicks) } } return nil, link, nil } // ReceiveMessages receives a fixed number of messages, up to maxMessages. // This function will block until at least one message is received or until the ctx is cancelled. // If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable. func (r *SessionReceiver) ReceiveMessages(ctx context.Context, maxMessages int, options *ReceiveMessagesOptions) ([]*ReceivedMessage, error) { return r.inner.ReceiveMessages(ctx, maxMessages, options) } // ReceiveDeferredMessages receives messages that were deferred using `Receiver.DeferMessage`. // If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable. func (r *SessionReceiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers []int64, options *ReceiveDeferredMessagesOptions) ([]*ReceivedMessage, error) { return r.inner.ReceiveDeferredMessages(ctx, sequenceNumbers, options) } // PeekMessages will peek messages without locking or deleting messages. // // The SessionReceiver stores the last peeked sequence number internally, and will use it as the // start location for the next PeekMessages() call. You can override this behavior by passing an // explicit sequence number in [azservicebus.PeekMessagesOptions.FromSequenceNumber]. // // Messages that are peeked are not locked, so settlement methods like [SessionReceiver.CompleteMessage], // [SessionReceiver.AbandonMessage], [SessionReceiver.DeferMessage] or [SessionReceiver.DeadLetterMessage] will not work with them. // // If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable. // // For more information about peeking/message-browsing see https://aka.ms/azsdk/servicebus/message-browsing func (r *SessionReceiver) PeekMessages(ctx context.Context, maxMessageCount int, options *PeekMessagesOptions) ([]*ReceivedMessage, error) { return r.inner.PeekMessages(ctx, maxMessageCount, options) } // Close permanently closes the receiver. func (r *SessionReceiver) Close(ctx context.Context) error { return r.inner.Close(ctx) } // CompleteMessage completes a message, deleting it from the queue or subscription. // If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable. func (r *SessionReceiver) CompleteMessage(ctx context.Context, message *ReceivedMessage, options *CompleteMessageOptions) error { return r.inner.CompleteMessage(ctx, message, options) } // AbandonMessage will cause a message to be returned to the queue or subscription. // This will increment its delivery count, and potentially cause it to be dead lettered // depending on your queue or subscription's configuration. // If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable. func (r *SessionReceiver) AbandonMessage(ctx context.Context, message *ReceivedMessage, options *AbandonMessageOptions) error { return r.inner.AbandonMessage(ctx, message, options) } // DeferMessage will cause a message to be deferred. Deferred messages // can be received using `Receiver.ReceiveDeferredMessages`. // If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable. func (r *SessionReceiver) DeferMessage(ctx context.Context, message *ReceivedMessage, options *DeferMessageOptions) error { return r.inner.DeferMessage(ctx, message, options) } // DeadLetterMessage settles a message by moving it to the dead letter queue for a // queue or subscription. To receive these messages create a receiver with `Client.NewReceiverForQueue()` // or `Client.NewReceiverForSubscription()` using the `ReceiverOptions.SubQueue` option. // If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable. func (r *SessionReceiver) DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) error { return r.inner.DeadLetterMessage(ctx, message, options) } // SessionID is the session ID for this SessionReceiver. func (sr *SessionReceiver) SessionID() string { // return the ultimately assigned session ID for this link (anonymous will get it from the // link filter options, non-anonymous is set in newSessionReceiver) return *sr.sessionID } // LockedUntil is the time the lock on this session expires. // The lock can be renewed using `SessionReceiver.RenewSessionLock`. func (sr *SessionReceiver) LockedUntil() time.Time { return sr.lockedUntil } // GetSessionStateOptions contains optional parameters for the GetSessionState function. type GetSessionStateOptions struct { // For future expansion } // GetSessionState retrieves state associated with the session. // If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable. func (sr *SessionReceiver) GetSessionState(ctx context.Context, options *GetSessionStateOptions) ([]byte, error) { var sessionState []byte err := sr.inner.amqpLinks.Retry(ctx, EventReceiver, "GetSessionState", func(ctx context.Context, lwv *internal.LinksWithID, args *utils.RetryFnArgs) error { s, err := internal.GetSessionState(ctx, lwv.RPC, lwv.Receiver.LinkName(), sr.SessionID()) if err != nil { return err } sessionState = s return nil }, sr.inner.retryOptions) return sessionState, internal.TransformError(err) } // SetSessionStateOptions contains optional parameters for the SetSessionState function. type SetSessionStateOptions struct { // For future expansion } // SetSessionState sets the state associated with the session. // Pass nil for the state parameter to clear the stored session state. // If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable. func (sr *SessionReceiver) SetSessionState(ctx context.Context, state []byte, options *SetSessionStateOptions) error { err := sr.inner.amqpLinks.Retry(ctx, EventReceiver, "SetSessionState", func(ctx context.Context, lwv *internal.LinksWithID, args *utils.RetryFnArgs) error { return internal.SetSessionState(ctx, lwv.RPC, lwv.Receiver.LinkName(), sr.SessionID(), state) }, sr.inner.retryOptions) return internal.TransformError(err) } // RenewSessionLockOptions contains optional parameters for the RenewSessionLock function. type RenewSessionLockOptions struct { // For future expansion } // RenewSessionLock renews this session's lock. The new expiration time is available // using `LockedUntil`. // If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable. func (sr *SessionReceiver) RenewSessionLock(ctx context.Context, options *RenewSessionLockOptions) error { err := sr.inner.amqpLinks.Retry(ctx, EventReceiver, "RenewSessionLock", func(ctx context.Context, lwv *internal.LinksWithID, args *utils.RetryFnArgs) error { newLockedUntil, err := internal.RenewSessionLock(ctx, lwv.RPC, lwv.Receiver.LinkName(), *sr.sessionID) if err != nil { return err } sr.lockedUntil = newLockedUntil return nil }, sr.inner.retryOptions) return internal.TransformError(err) } // init ensures the link was created, guaranteeing that we get our expected session lock. func (sr *SessionReceiver) init(ctx context.Context) error { // initialize the links _, err := sr.inner.amqpLinks.Get(ctx) return internal.TransformError(err) } // 1970-01-01, represented in "ticks" (100ns per millisecond) (ie: .NET's time unit for DateTimeOffset) const epochTicks = int64(621355968000000000) func ticksToUnixTime(ticks int64) time.Time { // normalize our time so it starts from the Unix epoch, then convert from ticks // to milliseconds. millisFromTicks := (ticks - epochTicks) / 10000 return time.UnixMilli(millisFromTicks).UTC() }