receiver.go (631 lines of code) (raw):

package amqp import ( "bytes" "context" "errors" "fmt" "sync" "sync/atomic" "github.com/Azure/go-amqp/internal/buffer" "github.com/Azure/go-amqp/internal/debug" "github.com/Azure/go-amqp/internal/encoding" "github.com/Azure/go-amqp/internal/frames" "github.com/Azure/go-amqp/internal/queue" ) // Default link options const ( defaultLinkCredit = 1 ) // Receiver receives messages on a single AMQP link. type Receiver struct { l link // message receiving receiverReady chan struct{} // receiver sends on this when mux is paused to indicate it can handle more messages messagesQ *queue.Holder[Message] // used to send completed messages to receiver txDisposition chan frameBodyEnvelope // used to funnel disposition frames through the mux // NOTE: this will need to be retooled if/when we need to support resuming links. // at present, this is only used for debug tracing purposes so it's safe to change it to a count. unsettledMessages int32 // count of unsettled messages for this receiver; MUST be atomically accessed msgBuf buffer.Buffer // buffered bytes for current message more bool // if true, buf contains a partial message msg Message // current message being decoded settlementCount uint32 // the count of settled messages settlementCountMu sync.Mutex // must be held when accessing settlementCount autoSendFlow bool // automatically send flow frames as credit becomes available inFlight inFlight // used to track message disposition when rcv-settle-mode == second creditor creditor // manages credits via calls to IssueCredit/DrainCredit } // IssueCredit adds credits to be requested in the next flow request. // Attempting to issue more credit than the receiver's max credit as // specified in ReceiverOptions.MaxCredit will result in an error. func (r *Receiver) IssueCredit(credit uint32) error { if r.autoSendFlow { return errors.New("issueCredit can only be used with receiver links using manual credit management") } if err := r.creditor.IssueCredit(credit); err != nil { return err } // cause mux() to check our flow conditions. select { case r.receiverReady <- struct{}{}: default: } return nil } // DrainCreditOptions contains any optional values for the Receiver.DrainCredit method. type DrainCreditOptions struct { // for future expansion } // DrainCredit sets the drain flag on the next outbound FLOW frame and blocks until // the corresponding FLOW frame is received. While a drain is in progress, messages // can continue to arrive. After a drain completes, the Receiver will have // zero active credits. To begin receiving again, call IssueCredit() to add active credits // to your Receiver. // // You may only have a single Drain operation active, at a time. // // If the context passed to DrainCredit expires or is cancelled then the receiver's // issued credits should be considered ambiguous. // // Returns nil if the drain has completed, error otherwise. // // NOTE: The behavior of drain is optional, as per the AMQP spec. Check with your individual // broker's documentation for implementation details. func (r *Receiver) DrainCredit(ctx context.Context, _ *DrainCreditOptions) error { if r.autoSendFlow { return errors.New("drain can only be used with receiver links using manual credit management") } return r.creditor.Drain(ctx, r) } // Prefetched returns the next message that is stored in the Receiver's // prefetch cache. It does NOT wait for the remote sender to send messages // and returns immediately if the prefetch cache is empty. To receive from the // prefetch and wait for messages from the remote Sender use `Receive`. // // Once a message is received, and if the sender is configured in any mode other // than SenderSettleModeSettled, you *must* take an action on the message by calling // one of the following: AcceptMessage, RejectMessage, ReleaseMessage, ModifyMessage. func (r *Receiver) Prefetched() *Message { select { case r.receiverReady <- struct{}{}: default: } // non-blocking receive to ensure buffered messages are // delivered regardless of whether the link has been closed. q := r.messagesQ.Acquire() msg := q.Dequeue() r.messagesQ.Release(q) if msg == nil { return nil } debug.Log(3, "RX (Receiver %p): prefetched delivery ID %d", r, msg.deliveryID) if msg.settled { r.onSettlement(1) } return msg } // ReceiveOptions contains any optional values for the Receiver.Receive method. type ReceiveOptions struct { // for future expansion } // Receive returns the next message from the sender. // Blocks until a message is received, ctx completes, or an error occurs. // // Once a message is received, and if the sender is configured in any mode other // than SenderSettleModeSettled, you *must* take an action on the message by calling // one of the following: AcceptMessage, RejectMessage, ReleaseMessage, ModifyMessage. func (r *Receiver) Receive(ctx context.Context, opts *ReceiveOptions) (*Message, error) { if msg := r.Prefetched(); msg != nil { return msg, nil } // wait for the next message select { case q := <-r.messagesQ.Wait(): msg := q.Dequeue() debug.Assert(msg != nil) debug.Log(3, "RX (Receiver %p): received delivery ID %d", r, msg.deliveryID) r.messagesQ.Release(q) if msg.settled { r.onSettlement(1) } return msg, nil case <-r.l.done: // if the link receives messages and is then closed between the above call to r.Prefetched() // and this select statement, the order of selecting r.messages and r.l.done is undefined. // however, once r.l.done is closed the link cannot receive any more messages. so be sure to // drain any that might have trickled in within this window. if msg := r.Prefetched(); msg != nil { return msg, nil } return nil, r.l.doneErr case <-ctx.Done(): return nil, ctx.Err() } } // Accept notifies the server that the message has been accepted and does not require redelivery. // - ctx controls waiting for the peer to acknowledge the disposition // - msg is the message to accept // // If the context's deadline expires or is cancelled before the operation // completes, the message's disposition is in an unknown state. func (r *Receiver) AcceptMessage(ctx context.Context, msg *Message) error { return msg.rcv.messageDisposition(ctx, msg, &encoding.StateAccepted{}) } // Reject notifies the server that the message is invalid. // - ctx controls waiting for the peer to acknowledge the disposition // - msg is the message to reject // - e is an optional rejection error // // If the context's deadline expires or is cancelled before the operation // completes, the message's disposition is in an unknown state. func (r *Receiver) RejectMessage(ctx context.Context, msg *Message, e *Error) error { return msg.rcv.messageDisposition(ctx, msg, &encoding.StateRejected{Error: e}) } // Release releases the message back to the server. The message may be redelivered to this or another consumer. // - ctx controls waiting for the peer to acknowledge the disposition // - msg is the message to release // // If the context's deadline expires or is cancelled before the operation // completes, the message's disposition is in an unknown state. func (r *Receiver) ReleaseMessage(ctx context.Context, msg *Message) error { return msg.rcv.messageDisposition(ctx, msg, &encoding.StateReleased{}) } // Modify notifies the server that the message was not acted upon and should be modifed. // - ctx controls waiting for the peer to acknowledge the disposition // - msg is the message to modify // - options contains the optional settings to modify // // If the context's deadline expires or is cancelled before the operation // completes, the message's disposition is in an unknown state. func (r *Receiver) ModifyMessage(ctx context.Context, msg *Message, options *ModifyMessageOptions) error { if options == nil { options = &ModifyMessageOptions{} } return msg.rcv.messageDisposition(ctx, msg, &encoding.StateModified{ DeliveryFailed: options.DeliveryFailed, UndeliverableHere: options.UndeliverableHere, MessageAnnotations: options.Annotations, }) } // ModifyMessageOptions contains the optional parameters to ModifyMessage. type ModifyMessageOptions struct { // DeliveryFailed indicates that the server must consider this an // unsuccessful delivery attempt and increment the delivery count. DeliveryFailed bool // UndeliverableHere indicates that the server must not redeliver // the message to this link. UndeliverableHere bool // Annotations is an optional annotation map to be merged // with the existing message annotations, overwriting existing keys // if necessary. Annotations Annotations } // Address returns the link's address. func (r *Receiver) Address() string { if r.l.source == nil { return "" } return r.l.source.Address } // LinkName returns associated link name or an empty string if link is not defined. func (r *Receiver) LinkName() string { return r.l.key.name } // LinkSourceFilterValue retrieves the specified link source filter value or nil if it doesn't exist. func (r *Receiver) LinkSourceFilterValue(name string) any { if r.l.source == nil { return nil } filter, ok := r.l.source.Filter[encoding.Symbol(name)] if !ok { return nil } return filter.Value } // Properties returns the peer's link properties. // Returns nil if the peer didn't send any properties. func (r *Receiver) Properties() map[string]any { return r.l.peerProperties } // Close closes the Receiver and AMQP link. // - ctx controls waiting for the peer to acknowledge the close // // If the context's deadline expires or is cancelled before the operation // completes, an error is returned. However, the operation will continue to // execute in the background. Subsequent calls will return a *LinkError // that contains the context's error message. func (r *Receiver) Close(ctx context.Context) error { return r.l.closeLink(ctx) } // sendDisposition sends a disposition frame to the peer func (r *Receiver) sendDisposition(ctx context.Context, first uint32, last *uint32, state encoding.DeliveryState) error { fr := &frames.PerformDisposition{ Role: encoding.RoleReceiver, First: first, Last: last, Settled: r.l.receiverSettleMode == nil || *r.l.receiverSettleMode == ReceiverSettleModeFirst, State: state, } frameCtx := frameContext{ Ctx: ctx, Done: make(chan struct{}), } select { case r.txDisposition <- frameBodyEnvelope{FrameCtx: &frameCtx, FrameBody: fr}: debug.Log(2, "TX (Receiver %p): mux txDisposition %s", r, fr) case <-r.l.done: return r.l.doneErr } select { case <-frameCtx.Done: return frameCtx.Err case <-r.l.done: return r.l.doneErr } } // messageDisposition is called via the *Receiver associated with a *Message. // this allows messages to be settled across Receiver instances. // note that only unsettled messsages will have their rcv field set. func (r *Receiver) messageDisposition(ctx context.Context, msg *Message, state encoding.DeliveryState) error { // settling a message that's already settled (sender-settled or otherwise) will have a nil rcv. // which means that r will be nil. you MUST NOT dereference r if msg.settled == true if msg.settled { return nil } debug.Assert(r != nil) // NOTE: we MUST add to the in-flight map before sending the disposition. if not, it's possible // to receive the ack'ing disposition frame *before* the in-flight map has been updated which // will cause the below <-wait to never trigger. var wait chan error if r.l.receiverSettleMode != nil && *r.l.receiverSettleMode == ReceiverSettleModeSecond { debug.Log(3, "TX (Receiver %p): delivery ID %d is in flight", r, msg.deliveryID) wait = r.inFlight.add(msg) } if err := r.sendDisposition(ctx, msg.deliveryID, nil, state); err != nil { return err } if wait == nil { // mode first, there will be no settlement ack msg.onSettlement() r.deleteUnsettled() r.onSettlement(1) return nil } select { case err := <-wait: // err has three possibilities // - nil, meaning the peer acknowledged the settlement // - an *Error, meaning the peer rejected the message with a provided error // - a non-AMQP error. this comes from calls to inFlight.clear() during mux unwind. // only for the first two cases is the message considered settled if amqpErr := (&Error{}); err == nil || errors.As(err, &amqpErr) { debug.Log(3, "RX (Receiver %p): delivery ID %d has been settled", r, msg.deliveryID) // we've received confirmation of disposition return err } debug.Log(3, "RX (Receiver %p): error settling delivery ID %d: %v", r, msg.deliveryID, err) return err case <-ctx.Done(): // didn't receive the ack in the time allotted, leave message as unsettled // TODO: if the ack arrives later, we need to remove the message from the unsettled map and reclaim the credit return ctx.Err() } } // onSettlement is to be called after message settlement. // - count is the number of messages that were settled func (r *Receiver) onSettlement(count uint32) { if !r.autoSendFlow { return } r.settlementCountMu.Lock() r.settlementCount += count r.settlementCountMu.Unlock() select { case r.receiverReady <- struct{}{}: // woke up default: // wake pending } } // increments the count of unsettled messages. // this is only called from our mux. func (r *Receiver) addUnsettled() { atomic.AddInt32(&r.unsettledMessages, 1) } // decrements the count of unsettled messages. // this is called inside _or_ outside the mux. // it's called outside when RSM is mode first. func (r *Receiver) deleteUnsettled() { atomic.AddInt32(&r.unsettledMessages, -1) } // returns the count of unsettled messages. // this is only called from our mux for diagnostic purposes. func (r *Receiver) countUnsettled() int32 { return atomic.LoadInt32(&r.unsettledMessages) } func newReceiver(source string, session *Session, opts *ReceiverOptions) (*Receiver, error) { l := newLink(session, encoding.RoleReceiver) l.source = &frames.Source{Address: source} l.target = new(frames.Target) l.linkCredit = defaultLinkCredit r := &Receiver{ l: l, autoSendFlow: true, receiverReady: make(chan struct{}, 1), txDisposition: make(chan frameBodyEnvelope), } r.messagesQ = queue.NewHolder(queue.New[Message](int(session.incomingWindow))) if opts == nil { return r, nil } for _, v := range opts.Capabilities { r.l.target.Capabilities = append(r.l.target.Capabilities, encoding.Symbol(v)) } if opts.Credit > 0 { r.l.linkCredit = uint32(opts.Credit) } else if opts.Credit < 0 { r.l.linkCredit = 0 r.autoSendFlow = false } if opts.DesiredCapabilities != nil { r.l.desiredCapabilities = make([]encoding.Symbol, 0, len(opts.DesiredCapabilities)) for _, capabilityStr := range opts.DesiredCapabilities { r.l.desiredCapabilities = append(r.l.desiredCapabilities, encoding.Symbol(capabilityStr)) } } if opts.Durability > DurabilityUnsettledState { return nil, fmt.Errorf("invalid Durability %d", opts.Durability) } r.l.target.Durable = opts.Durability if opts.DynamicAddress { r.l.source.Address = "" r.l.dynamicAddr = opts.DynamicAddress } if opts.ExpiryPolicy != "" { if err := encoding.ValidateExpiryPolicy(opts.ExpiryPolicy); err != nil { return nil, err } r.l.target.ExpiryPolicy = opts.ExpiryPolicy } r.l.target.Timeout = opts.ExpiryTimeout if opts.Filters != nil { r.l.source.Filter = make(encoding.Filter) for _, f := range opts.Filters { f(r.l.source.Filter) } } if opts.MaxMessageSize > 0 { r.l.maxMessageSize = opts.MaxMessageSize } if opts.Name != "" { r.l.key.name = opts.Name } if opts.Properties != nil { r.l.properties = make(map[encoding.Symbol]any) for k, v := range opts.Properties { if k == "" { return nil, errors.New("link property key must not be empty") } r.l.properties[encoding.Symbol(k)] = v } } if opts.RequestedSenderSettleMode != nil { if rsm := *opts.RequestedSenderSettleMode; rsm > SenderSettleModeMixed { return nil, fmt.Errorf("invalid RequestedSenderSettleMode %d", rsm) } r.l.senderSettleMode = opts.RequestedSenderSettleMode } if opts.SettlementMode != nil { if rsm := *opts.SettlementMode; rsm > ReceiverSettleModeSecond { return nil, fmt.Errorf("invalid SettlementMode %d", rsm) } r.l.receiverSettleMode = opts.SettlementMode } r.l.target.Address = opts.TargetAddress for _, v := range opts.SourceCapabilities { r.l.source.Capabilities = append(r.l.source.Capabilities, encoding.Symbol(v)) } if opts.SourceDurability != DurabilityNone { r.l.source.Durable = opts.SourceDurability } if opts.SourceExpiryPolicy != ExpiryPolicySessionEnd { r.l.source.ExpiryPolicy = opts.SourceExpiryPolicy } if opts.SourceExpiryTimeout != 0 { r.l.source.Timeout = opts.SourceExpiryTimeout } return r, nil } // attach sends the Attach performative to establish the link with its parent session. // this is automatically called by the new*Link constructors. func (r *Receiver) attach(ctx context.Context) error { if err := r.l.attach(ctx, func(pa *frames.PerformAttach) { pa.Role = encoding.RoleReceiver if pa.Source == nil { pa.Source = new(frames.Source) } pa.Source.Dynamic = r.l.dynamicAddr }, func(pa *frames.PerformAttach) { if r.l.source == nil { r.l.source = new(frames.Source) } // if dynamic address requested, copy assigned name to address if r.l.dynamicAddr && pa.Source != nil { r.l.source.Address = pa.Source.Address } // deliveryCount is a sequence number, must initialize to sender's initial sequence number r.l.deliveryCount = pa.InitialDeliveryCount // copy the received filter values if pa.Source != nil { r.l.source.Filter = pa.Source.Filter } }); err != nil { return err } return nil } func nopHook() {} type receiverTestHooks struct { MuxStart func() MuxSelect func() } func (r *Receiver) mux(hooks receiverTestHooks) { if hooks.MuxSelect == nil { hooks.MuxSelect = nopHook } if hooks.MuxStart == nil { hooks.MuxStart = nopHook } defer func() { // unblock any in flight message dispositions r.inFlight.clear(r.l.doneErr) if !r.autoSendFlow { // unblock any pending drain requests r.creditor.EndDrain() } close(r.l.done) }() hooks.MuxStart() if r.autoSendFlow { r.l.doneErr = r.muxFlow(r.l.linkCredit, false) } for { msgLen := r.messagesQ.Len() r.settlementCountMu.Lock() // counter that accumulates the settled delivery count. // once the threshold has been reached, the counter is // reset and a flow frame is sent. previousSettlementCount := r.settlementCount if previousSettlementCount >= r.l.linkCredit { r.settlementCount = 0 } r.settlementCountMu.Unlock() // once we have pending credit equal to or greater than our available credit, reclaim it. // we do this instead of settlementCount > 0 to prevent flow frames from being too chatty. // NOTE: we compare the settlementCount against the current link credit instead of some // fixed threshold to ensure credit is reclaimed in cases where the number of unsettled // messages remains high for whatever reason. if r.autoSendFlow && previousSettlementCount > 0 && previousSettlementCount >= r.l.linkCredit { debug.Log(1, "RX (Receiver %p) (auto): source: %q, inflight: %d, linkCredit: %d, deliveryCount: %d, messages: %d, unsettled: %d, settlementCount: %d, settleMode: %s", r, r.l.source.Address, r.inFlight.len(), r.l.linkCredit, r.l.deliveryCount, msgLen, r.countUnsettled(), previousSettlementCount, r.l.receiverSettleMode.String()) r.l.doneErr = r.creditor.IssueCredit(previousSettlementCount) } else if r.l.linkCredit == 0 { debug.Log(1, "RX (Receiver %p) (pause): source: %q, inflight: %d, linkCredit: %d, deliveryCount: %d, messages: %d, unsettled: %d, settlementCount: %d, settleMode: %s", r, r.l.source.Address, r.inFlight.len(), r.l.linkCredit, r.l.deliveryCount, msgLen, r.countUnsettled(), previousSettlementCount, r.l.receiverSettleMode.String()) } if r.l.doneErr != nil { return } drain, credits := r.creditor.FlowBits(r.l.linkCredit) if drain || credits > 0 { debug.Log(1, "RX (Receiver %p) (flow): source: %q, inflight: %d, curLinkCredit: %d, newLinkCredit: %d, drain: %v, deliveryCount: %d, messages: %d, unsettled: %d, settlementCount: %d, settleMode: %s", r, r.l.source.Address, r.inFlight.len(), r.l.linkCredit, credits, drain, r.l.deliveryCount, msgLen, r.countUnsettled(), previousSettlementCount, r.l.receiverSettleMode.String()) // send a flow frame. r.l.doneErr = r.muxFlow(credits, drain) } if r.l.doneErr != nil { return } txDisposition := r.txDisposition closed := r.l.close if r.l.closeInProgress { // swap out channel so it no longer triggers closed = nil // disable sending of disposition frames once closing is in progress. // this is to prevent races between mux shutdown and clearing of // any in-flight dispositions. txDisposition = nil } hooks.MuxSelect() select { case q := <-r.l.rxQ.Wait(): // populated queue fr := *q.Dequeue() r.l.rxQ.Release(q) // if muxHandleFrame returns an error it means the mux must terminate. // note that in the case of a client-side close due to an error, nil // is returned in order to keep the mux running to ack the detach frame. if err := r.muxHandleFrame(fr); err != nil { r.l.doneErr = err return } case env := <-txDisposition: r.l.txFrame(env.FrameCtx, env.FrameBody) case <-r.receiverReady: continue case <-closed: if r.l.closeInProgress { // a client-side close due to protocol error is in progress continue } // receiver is being closed by the client r.l.closeInProgress = true fr := &frames.PerformDetach{ Handle: r.l.outputHandle, Closed: true, } r.l.txFrame(&frameContext{Ctx: context.Background()}, fr) case <-r.l.session.done: r.l.doneErr = r.l.session.doneErr return } } } // muxFlow sends tr to the session mux. // l.linkCredit will also be updated to `linkCredit` func (r *Receiver) muxFlow(linkCredit uint32, drain bool) error { var ( deliveryCount = r.l.deliveryCount ) fr := &frames.PerformFlow{ Handle: &r.l.outputHandle, DeliveryCount: &deliveryCount, LinkCredit: &linkCredit, // max number of messages, Drain: drain, } // Update credit. This must happen before entering loop below // because incoming messages handled while waiting to transmit // flow increment deliveryCount. This causes the credit to become // out of sync with the server. if !drain { // if we're draining we don't want to touch our internal credit - we're not changing it so any issued credits // are still valid until drain completes, at which point they will be naturally zeroed. r.l.linkCredit = linkCredit } select { case r.l.session.tx <- frameBodyEnvelope{FrameCtx: &frameContext{Ctx: context.Background()}, FrameBody: fr}: debug.Log(2, "TX (Receiver %p): mux frame to Session (%p): %d, %s", r, r.l.session, r.l.session.channel, fr) return nil case <-r.l.close: return nil case <-r.l.session.done: return r.l.session.doneErr } } // muxHandleFrame processes fr based on type. func (r *Receiver) muxHandleFrame(fr frames.FrameBody) error { debug.Log(2, "RX (Receiver %p): %s", r, fr) switch fr := fr.(type) { // message frame case *frames.PerformTransfer: r.muxReceive(*fr) // flow control frame case *frames.PerformFlow: if !fr.Echo { // if the 'drain' flag has been set in the frame sent to the _receiver_ then // we signal whomever is waiting (the service has seen and acknowledged our drain) if fr.Drain && !r.autoSendFlow { r.l.linkCredit = 0 // we have no active credits at this point. r.creditor.EndDrain() } return nil } var ( // copy because sent by pointer below; prevent race linkCredit = r.l.linkCredit deliveryCount = r.l.deliveryCount ) // send flow resp := &frames.PerformFlow{ Handle: &r.l.outputHandle, DeliveryCount: &deliveryCount, LinkCredit: &linkCredit, // max number of messages } select { case r.l.session.tx <- frameBodyEnvelope{FrameCtx: &frameContext{Ctx: context.Background()}, FrameBody: resp}: debug.Log(2, "TX (Receiver %p): mux frame to Session (%p): %d, %s", r, r.l.session, r.l.session.channel, resp) case <-r.l.close: return nil case <-r.l.session.done: return r.l.session.doneErr } case *frames.PerformDisposition: // Unblock receivers waiting for message disposition // bubble disposition error up to the receiver var dispositionError error if state, ok := fr.State.(*encoding.StateRejected); ok { // state.Error isn't required to be filled out. For instance if you dead letter a message // you will get a rejected response that doesn't contain an error. if state.Error != nil { dispositionError = state.Error } } // removal from the in-flight map will also remove the message from the unsettled map count := r.inFlight.remove(fr.First, fr.Last, dispositionError, func(msg *Message) { r.deleteUnsettled() msg.onSettlement() }) r.onSettlement(count) default: return r.l.muxHandleFrame(fr) } return nil } func (r *Receiver) muxReceive(fr frames.PerformTransfer) { if !r.more { // this is the first transfer of a message, // record the delivery ID, message format, // and delivery Tag if fr.DeliveryID != nil { r.msg.deliveryID = *fr.DeliveryID } if fr.MessageFormat != nil { r.msg.Format = *fr.MessageFormat } r.msg.DeliveryTag = fr.DeliveryTag // these fields are required on first transfer of a message if fr.DeliveryID == nil { r.l.closeWithError(ErrCondNotAllowed, "received message without a delivery-id") return } if fr.MessageFormat == nil { r.l.closeWithError(ErrCondNotAllowed, "received message without a message-format") return } if fr.DeliveryTag == nil { r.l.closeWithError(ErrCondNotAllowed, "received message without a delivery-tag") return } } else { // this is a continuation of a multipart message // some fields may be omitted on continuation transfers, // but if they are included they must be consistent // with the first. if fr.DeliveryID != nil && *fr.DeliveryID != r.msg.deliveryID { msg := fmt.Sprintf( "received continuation transfer with inconsistent delivery-id: %d != %d", *fr.DeliveryID, r.msg.deliveryID, ) r.l.closeWithError(ErrCondNotAllowed, msg) return } if fr.MessageFormat != nil && *fr.MessageFormat != r.msg.Format { msg := fmt.Sprintf( "received continuation transfer with inconsistent message-format: %d != %d", *fr.MessageFormat, r.msg.Format, ) r.l.closeWithError(ErrCondNotAllowed, msg) return } if fr.DeliveryTag != nil && !bytes.Equal(fr.DeliveryTag, r.msg.DeliveryTag) { msg := fmt.Sprintf( "received continuation transfer with inconsistent delivery-tag: %q != %q", fr.DeliveryTag, r.msg.DeliveryTag, ) r.l.closeWithError(ErrCondNotAllowed, msg) return } } // discard message if it's been aborted if fr.Aborted { r.msgBuf.Reset() r.msg = Message{} r.more = false return } // ensure maxMessageSize will not be exceeded if r.l.maxMessageSize != 0 && uint64(r.msgBuf.Len())+uint64(len(fr.Payload)) > r.l.maxMessageSize { r.l.closeWithError(ErrCondMessageSizeExceeded, fmt.Sprintf("received message larger than max size of %d", r.l.maxMessageSize)) return } // add the payload the the buffer r.msgBuf.Append(fr.Payload) // mark as settled if at least one frame is settled r.msg.settled = r.msg.settled || fr.Settled // save in-progress status r.more = fr.More if fr.More { return } // last frame in message err := r.msg.Unmarshal(&r.msgBuf) if err != nil { r.l.closeWithError(ErrCondInternalError, err.Error()) return } // send to receiver if !r.msg.settled { r.addUnsettled() r.msg.rcv = r debug.Log(3, "RX (Receiver %p): add unsettled delivery ID %d", r, r.msg.deliveryID) } q := r.messagesQ.Acquire() q.Enqueue(r.msg) msgLen := q.Len() r.messagesQ.Release(q) // reset progress r.msgBuf.Reset() r.msg = Message{} // decrement link-credit after entire message received r.l.deliveryCount++ r.l.linkCredit-- debug.Log(3, "RX (Receiver %p) link %s - deliveryCount: %d, linkCredit: %d, len(messages): %d", r, r.l.key.name, r.l.deliveryCount, r.l.linkCredit, msgLen) } // inFlight tracks in-flight message dispositions allowing receivers // to block waiting for the server to respond when an appropriate // settlement mode is configured. type inFlight struct { mu sync.RWMutex m map[uint32]inFlightInfo } type inFlightInfo struct { wait chan error msg *Message } func (f *inFlight) add(msg *Message) chan error { wait := make(chan error, 1) f.mu.Lock() if f.m == nil { f.m = make(map[uint32]inFlightInfo) } f.m[msg.deliveryID] = inFlightInfo{wait: wait, msg: msg} f.mu.Unlock() return wait } func (f *inFlight) remove(first uint32, last *uint32, err error, handler func(*Message)) uint32 { f.mu.Lock() if f.m == nil { f.mu.Unlock() return 0 } ll := first if last != nil { ll = *last } count := uint32(0) for i := first; i <= ll; i++ { info, ok := f.m[i] if ok { handler(info.msg) info.wait <- err delete(f.m, i) count++ } } f.mu.Unlock() return count } func (f *inFlight) clear(err error) { f.mu.Lock() for id, info := range f.m { info.wait <- err delete(f.m, id) } f.mu.Unlock() } func (f *inFlight) len() int { f.mu.RLock() defer f.mu.RUnlock() return len(f.m) }