link.go (260 lines of code) (raw):

package amqp import ( "context" "errors" "fmt" "sync" "github.com/Azure/go-amqp/internal/debug" "github.com/Azure/go-amqp/internal/encoding" "github.com/Azure/go-amqp/internal/frames" "github.com/Azure/go-amqp/internal/queue" "github.com/Azure/go-amqp/internal/shared" ) // linkKey uniquely identifies a link on a connection by name and direction. // // A link can be identified uniquely by the ordered tuple // // (source-container-id, target-container-id, name) // // On a single connection the container ID pairs can be abbreviated // to a boolean flag indicating the direction of the link. type linkKey struct { name string role encoding.Role // Local role: sender/receiver } // link contains the common state and methods for sending and receiving links type link struct { key linkKey // Name and direction // NOTE: outputHandle and inputHandle might not have the same value // our handle outputHandle uint32 // remote's handle inputHandle uint32 // frames destined for this link are added to this queue by Session.muxFrameToLink rxQ *queue.Holder[frames.FrameBody] // used for gracefully closing link close chan struct{} // signals a link's mux to shut down; DO NOT use this to check if a link has terminated, use done instead closeOnce *sync.Once // closeOnce protects close from being closed multiple times done chan struct{} // closed when the link has terminated (mux exited); DO NOT wait on this from within a link's mux() as it will never trigger! doneErr error // contains the mux error state; ONLY written to by the mux and MUST only be read from after done is closed! closeErr error // contains the error state returned from closeLink(); ONLY closeLink() reads/writes this! session *Session // parent session source *frames.Source // used for Receiver links target *frames.Target // used for Sender links properties map[encoding.Symbol]any // additional properties sent upon link attach // "The delivery-count is initialized by the sender when a link endpoint is created, // and is incremented whenever a message is sent. Only the sender MAY independently // modify this field. The receiver's value is calculated based on the last known // value from the sender and any subsequent messages received on the link. Note that, // despite its name, the delivery-count is not a count but a sequence number // initialized at an arbitrary point by the sender." deliveryCount uint32 // The current maximum number of messages that can be handled at the receiver endpoint of the link. Only the receiver endpoint // can independently set this value. The sender endpoint sets this to the last known value seen from the receiver. linkCredit uint32 // properties returned by the peer peerProperties map[string]any senderSettleMode *SenderSettleMode receiverSettleMode *ReceiverSettleMode maxMessageSize uint64 closeInProgress bool // indicates that the detach performative has been sent dynamicAddr bool // request a dynamic link address from the server desiredCapabilities encoding.MultiSymbol // maps to the ATTACH frame's desired-capabilities field } func newLink(s *Session, r encoding.Role) link { l := link{ key: linkKey{shared.RandString(40), r}, session: s, close: make(chan struct{}), closeOnce: &sync.Once{}, done: make(chan struct{}), } // set the segment size relative to respective window var segmentSize int if r == encoding.RoleReceiver { segmentSize = int(s.incomingWindow) } else { segmentSize = int(s.outgoingWindow) } l.rxQ = queue.NewHolder(queue.New[frames.FrameBody](segmentSize)) return l } // waitForFrame waits for an incoming frame to be queued. // it returns the next frame from the queue, or an error. // the error is either from the context or session.doneErr. // not meant for consumption outside of link.go. func (l *link) waitForFrame(ctx context.Context) (frames.FrameBody, error) { select { case <-ctx.Done(): return nil, ctx.Err() case <-l.session.done: // session has terminated, no need to deallocate in this case return nil, l.session.doneErr case q := <-l.rxQ.Wait(): // frame received fr := q.Dequeue() l.rxQ.Release(q) return *fr, nil } } // attach sends the Attach performative to establish the link with its parent session. // this is automatically called by the new*Link constructors. func (l *link) attach(ctx context.Context, beforeAttach func(*frames.PerformAttach), afterAttach func(*frames.PerformAttach)) error { if err := l.session.freeAbandonedLinks(ctx); err != nil { return err } // once the abandoned links have been cleaned up we can create our link if err := l.session.allocateHandle(ctx, l); err != nil { return err } attach := &frames.PerformAttach{ Name: l.key.name, Handle: l.outputHandle, ReceiverSettleMode: l.receiverSettleMode, SenderSettleMode: l.senderSettleMode, MaxMessageSize: l.maxMessageSize, Source: l.source, Target: l.target, Properties: l.properties, DesiredCapabilities: l.desiredCapabilities, } // link-specific configuration of the attach frame beforeAttach(attach) if err := l.txFrameAndWait(ctx, attach); err != nil { return err } // wait for response fr, err := l.waitForFrame(ctx) if err != nil { l.session.abandonLink(l) return err } resp, ok := fr.(*frames.PerformAttach) if !ok { debug.Log(1, "RX (link %p): unexpected attach response frame %T", l, fr) if err := l.session.conn.Close(); err != nil { return err } return &ConnError{inner: fmt.Errorf("unexpected attach response: %#v", fr)} } // If the remote encounters an error during the attach it returns an Attach // with no Source or Target. The remote then sends a Detach with an error. // // Note that if the application chooses not to create a terminus, the session // endpoint will still create a link endpoint and issue an attach indicating // that the link endpoint has no associated local terminus. In this case, the // session endpoint MUST immediately detach the newly created link endpoint. // // http://docs.oasis-open.org/amqp/core/v1.0/csprd01/amqp-core-transport-v1.0-csprd01.html#doc-idp386144 if resp.Source == nil && resp.Target == nil { // wait for detach fr, err := l.waitForFrame(ctx) if err != nil { // we timed out waiting for the peer to close the link, this really isn't an abandoned link. // however, we still need to send the detach performative to ack the peer. l.session.abandonLink(l) return err } detach, ok := fr.(*frames.PerformDetach) if !ok { if err := l.session.conn.Close(); err != nil { return err } return &ConnError{inner: fmt.Errorf("unexpected frame while waiting for detach: %#v", fr)} } // send return detach fr = &frames.PerformDetach{ Handle: l.outputHandle, Closed: true, } if err := l.txFrameAndWait(ctx, fr); err != nil { return err } if detach.Error == nil { return fmt.Errorf("received detach with no error specified") } return detach.Error } if l.maxMessageSize == 0 || resp.MaxMessageSize < l.maxMessageSize { l.maxMessageSize = resp.MaxMessageSize } // link-specific configuration post attach afterAttach(resp) if err := l.setSettleModes(resp); err != nil { // close the link as there's a mismatch on requested/supported settlement modes dr := &frames.PerformDetach{ Handle: l.outputHandle, Closed: true, } if err := l.txFrameAndWait(ctx, dr); err != nil { return err } return err } if len(resp.Properties) > 0 { l.peerProperties = map[string]any{} for k, v := range resp.Properties { l.peerProperties[string(k)] = v } } return nil } // setSettleModes sets the settlement modes based on the resp frames.PerformAttach. // // If a settlement mode has been explicitly set locally and it was not honored by the // server an error is returned. func (l *link) setSettleModes(resp *frames.PerformAttach) error { var ( localRecvSettle = receiverSettleModeValue(l.receiverSettleMode) respRecvSettle = receiverSettleModeValue(resp.ReceiverSettleMode) ) if l.receiverSettleMode != nil && localRecvSettle != respRecvSettle { return fmt.Errorf("amqp: receiver settlement mode %q requested, received %q from server", l.receiverSettleMode, &respRecvSettle) } l.receiverSettleMode = &respRecvSettle var ( localSendSettle = senderSettleModeValue(l.senderSettleMode) respSendSettle = senderSettleModeValue(resp.SenderSettleMode) ) if l.senderSettleMode != nil && localSendSettle != respSendSettle { return fmt.Errorf("amqp: sender settlement mode %q requested, received %q from server", l.senderSettleMode, &respSendSettle) } l.senderSettleMode = &respSendSettle return nil } // muxHandleFrame processes fr based on type. func (l *link) muxHandleFrame(fr frames.FrameBody) error { switch fr := fr.(type) { case *frames.PerformDetach: if !fr.Closed { l.closeWithError(ErrCondNotImplemented, fmt.Sprintf("non-closing detach not supported: %+v", fr)) return nil } // there are two possibilities: // - this is the ack to a client-side Close() // - the peer is closing the link so we must ack if l.closeInProgress { // if the client-side close was initiated due to an error (l.closeWithError) // then l.doneErr will already be set. in this case, return that error instead // of an empty LinkError which indicates a clean client-side close. if l.doneErr != nil { return l.doneErr } return &LinkError{} } dr := &frames.PerformDetach{ Handle: l.outputHandle, Closed: true, } l.txFrame(&frameContext{Ctx: context.Background()}, dr) return &LinkError{RemoteErr: fr.Error} default: debug.Log(1, "RX (link %p): unexpected frame: %s", l, fr) l.closeWithError(ErrCondInternalError, fmt.Sprintf("link received unexpected frame %T", fr)) return nil } } // Close closes the Sender and AMQP link. func (l *link) closeLink(ctx context.Context) error { var ctxErr error l.closeOnce.Do(func() { close(l.close) // once the mux has received the ack'ing detach performative, the mux will // exit which deletes the link and closes l.done. select { case <-l.done: l.closeErr = l.doneErr case <-ctx.Done(): // notify the caller that the close timed out/was cancelled. // the mux will remain running and once the ack is received it will terminate. ctxErr = ctx.Err() // record that the close timed out/was cancelled. // subsequent calls to closeLink() will return this debug.Log(1, "TX (link %p) closing %s: %v", l, l.key.name, ctxErr) l.closeErr = &LinkError{inner: ctxErr} } }) if ctxErr != nil { return ctxErr } var linkErr *LinkError if errors.As(l.closeErr, &linkErr) && linkErr.RemoteErr == nil && linkErr.inner == nil { // an empty LinkError means the link was cleanly closed by the caller return nil } return l.closeErr } // closeWithError initiates closing the link with the specified AMQP error. // the mux must continue to run until the ack'ing detach is received. // l.doneErr is populated with a &LinkError{} containing an inner error constructed from the specified values // - cnd is the AMQP error condition // - desc is the error description func (l *link) closeWithError(cnd ErrCond, desc string) { amqpErr := &Error{Condition: cnd, Description: desc} if l.closeInProgress { debug.Log(3, "TX (link %p) close error already pending, discarding %v", l, amqpErr) return } dr := &frames.PerformDetach{ Handle: l.outputHandle, Closed: true, Error: amqpErr, } l.closeInProgress = true l.doneErr = &LinkError{inner: fmt.Errorf("%s: %s", cnd, desc)} l.txFrame(&frameContext{Ctx: context.Background()}, dr) } // txFrame sends the specified frame via the link's session. // you MUST call this instead of session.txFrame() to ensure // that frames are not sent during session shutdown. func (l *link) txFrame(frameCtx *frameContext, fr frames.FrameBody) { // NOTE: there is no need to select on l.done as this is either // called from a link's mux or before the mux has even started. select { case <-l.session.done: // the link's session has terminated, let that propagate to the link's mux case <-l.session.endSent: // we swallow this to prevent the link's mux from terminating. // l.session.done will soon close so this is temporary. case l.session.tx <- frameBodyEnvelope{FrameCtx: frameCtx, FrameBody: fr}: debug.Log(2, "TX (link %p): mux frame to Session (%p): %s", l, l.session, fr) } } // txFrame sends the specified frame via the link's session. // you MUST call this instead of session.txFrame() to ensure // that frames are not sent during session shutdown. func (l *link) txFrameAndWait(ctx context.Context, fr frames.FrameBody) error { frameCtx := frameContext{ Ctx: ctx, Done: make(chan struct{}), } // NOTE: there is no need to select on l.done as this is either // called from a link's mux or before the mux has even started. select { case <-l.session.done: return l.session.doneErr case <-l.session.endSent: // we swallow this to prevent the link's mux from terminating. // l.session.done will soon close so this is temporary. return nil case l.session.tx <- frameBodyEnvelope{FrameCtx: &frameCtx, FrameBody: fr}: debug.Log(2, "TX (link %p): mux frame to Session (%p): %s", l, l.session, fr) } select { case <-frameCtx.Done: return frameCtx.Err case <-l.session.done: return l.session.doneErr } }