session.go (542 lines of code) (raw):

package amqp import ( "context" "errors" "fmt" "math" "sync" "github.com/Azure/go-amqp/internal/bitmap" "github.com/Azure/go-amqp/internal/debug" "github.com/Azure/go-amqp/internal/encoding" "github.com/Azure/go-amqp/internal/frames" "github.com/Azure/go-amqp/internal/queue" ) // Default session options const ( defaultWindow = 5000 ) // SessionOptions contains the optional settings for configuring an AMQP session. type SessionOptions struct { // MaxLinks sets the maximum number of links (Senders/Receivers) // allowed on the session. // // Minimum: 1. // Default: 4294967295. MaxLinks uint32 } // Session is an AMQP session. // // A session multiplexes Receivers. type Session struct { channel uint16 // session's local channel remoteChannel uint16 // session's remote channel, owned by conn.connReader conn *Conn // underlying conn tx chan frameBodyEnvelope // non-transfer frames to be sent; session must track disposition txTransfer chan transferEnvelope // transfer frames to be sent; session must track disposition // frames destined for this session are added to this queue by conn.connReader rxQ *queue.Holder[frames.FrameBody] // properties returned by the peer peerProperties map[string]any // flow control incomingWindow uint32 outgoingWindow uint32 needFlowCount uint32 handleMax uint32 // link management linksMu sync.RWMutex // used to synchronize link handle allocation linksByKey map[linkKey]*link // mapping of name+role link outputHandles *bitmap.Bitmap // allocated output handles abandonedLinksMu sync.Mutex abandonedLinks []*link // used for gracefully closing session close chan struct{} // closed by calling Close(). it signals that the end performative should be sent closeOnce sync.Once // part of internal public surface area done chan struct{} // closed when the session has terminated (mux exited); DO NOT wait on this from within Session.mux() as it will never trigger! endSent chan struct{} // closed when the end performative has been sent; once this is closed, links MUST NOT send any frames! doneErr error // contains the mux error state; ONLY written to by the mux and MUST only be read from after done is closed! closeErr error // contains the error state returned from Close(); ONLY Close() reads/writes this! } func newSession(c *Conn, channel uint16, opts *SessionOptions) *Session { s := &Session{ conn: c, channel: channel, tx: make(chan frameBodyEnvelope), txTransfer: make(chan transferEnvelope), incomingWindow: defaultWindow, outgoingWindow: defaultWindow, handleMax: math.MaxUint32 - 1, linksMu: sync.RWMutex{}, linksByKey: make(map[linkKey]*link), close: make(chan struct{}), done: make(chan struct{}), endSent: make(chan struct{}), } if opts != nil { if opts.MaxLinks != 0 { // MaxLinks is the number of total links. // handleMax is the max handle ID which starts // at zero. so we decrement by one s.handleMax = opts.MaxLinks - 1 } } // create output handle map after options have been applied s.outputHandles = bitmap.New(s.handleMax) s.rxQ = queue.NewHolder(queue.New[frames.FrameBody](int(s.incomingWindow))) return s } // waitForFrame waits for an incoming frame to be queued. // it returns the next frame from the queue, or an error. // the error is either from the context or conn.doneErr. // not meant for consumption outside of session.go. func (s *Session) waitForFrame(ctx context.Context) (frames.FrameBody, error) { var q *queue.Queue[frames.FrameBody] select { case <-ctx.Done(): return nil, ctx.Err() case <-s.conn.done: return nil, s.conn.doneErr case q = <-s.rxQ.Wait(): // populated queue } fr := q.Dequeue() s.rxQ.Release(q) return *fr, nil } func (s *Session) begin(ctx context.Context) error { // send Begin to server begin := &frames.PerformBegin{ NextOutgoingID: 0, IncomingWindow: s.incomingWindow, OutgoingWindow: s.outgoingWindow, HandleMax: s.handleMax, } if err := s.txFrameAndWait(ctx, begin); err != nil { return err } // wait for response fr, err := s.waitForFrame(ctx) if err != nil { // if we exit before receiving the ack, our caller will clean up the channel. // however, it does mean that the peer will now have assigned an outgoing // channel ID that's not in use. return err } begin, ok := fr.(*frames.PerformBegin) if !ok { // this codepath is hard to hit (impossible?). if the response isn't a PerformBegin and we've not // yet seen the remote channel number, the default clause in conn.connReader will protect us from that. // if we have seen the remote channel number then it's likely the session.mux for that channel will // either swallow the frame or blow up in some other way, both causing this call to hang. // deallocate session on error. we can't call // s.Close() as the session mux hasn't started yet. debug.Log(1, "RX (Session %p): unexpected begin response frame %T", s, fr) s.conn.deleteSession(s) if err := s.conn.Close(); err != nil { return err } return &ConnError{inner: fmt.Errorf("unexpected begin response: %#v", fr)} } if len(begin.Properties) > 0 { s.peerProperties = map[string]any{} for k, v := range begin.Properties { s.peerProperties[string(k)] = v } } // start Session multiplexor go s.mux(begin) return nil } // Close closes the session. // - ctx controls waiting for the peer to acknowledge the session is closed // // If the context's deadline expires or is cancelled before the operation // completes, an error is returned. However, the operation will continue to // execute in the background. Subsequent calls will return a *SessionError // that contains the context's error message. func (s *Session) Close(ctx context.Context) error { var ctxErr error s.closeOnce.Do(func() { close(s.close) // once the mux has received the ack'ing end performative, the mux will // exit which deletes the session and closes s.done. select { case <-s.done: s.closeErr = s.doneErr case <-ctx.Done(): // notify the caller that the close timed out/was cancelled. // the mux will remain running and once the ack is received it will terminate. ctxErr = ctx.Err() // record that the close timed out/was cancelled. // subsequent calls to Close() will return this debug.Log(1, "TX (Session %p) channel %d: %v", s, s.channel, ctxErr) s.closeErr = &SessionError{inner: ctxErr} } }) if ctxErr != nil { return ctxErr } var sessionErr *SessionError if errors.As(s.closeErr, &sessionErr) && sessionErr.RemoteErr == nil && sessionErr.inner == nil { // an empty SessionError means the session was cleanly closed by the caller return nil } return s.closeErr } // txFrame sends a frame to the connWriter. // - ctx is used to provide the write deadline // - fr is the frame to write to net.Conn func (s *Session) txFrame(frameCtx *frameContext, fr frames.FrameBody) { debug.Log(2, "TX (Session %p) mux frame to Conn (%p): %s", s, s.conn, fr) s.conn.sendFrame(frameEnvelope{ FrameCtx: frameCtx, Frame: frames.Frame{ Type: frames.TypeAMQP, Channel: s.channel, Body: fr, }, }) } // txFrameAndWait sends a frame to the connWriter and waits for the write to complete // - ctx is used to provide the write deadline // - fr is the frame to write to net.Conn func (s *Session) txFrameAndWait(ctx context.Context, fr frames.FrameBody) error { frameCtx := frameContext{ Ctx: ctx, Done: make(chan struct{}), } s.txFrame(&frameCtx, fr) select { case <-frameCtx.Done: return frameCtx.Err case <-s.conn.done: return s.conn.doneErr case <-s.done: return s.doneErr } } // NewReceiver opens a new receiver link on the session. // - ctx controls waiting for the peer to create a sending terminus // - source is the name of the peer's sending terminus // - opts contains optional values, pass nil to accept the defaults // // If the context's deadline expires or is cancelled before the operation // completes, an error is returned. If the Receiver was successfully // created, it will be cleaned up in future calls to NewReceiver. func (s *Session) NewReceiver(ctx context.Context, source string, opts *ReceiverOptions) (*Receiver, error) { return newReceiverForSession(ctx, s, source, opts, receiverTestHooks{}) } // split out so tests can add hooks func newReceiverForSession(ctx context.Context, s *Session, source string, opts *ReceiverOptions, hooks receiverTestHooks) (*Receiver, error) { r, err := newReceiver(source, s, opts) if err != nil { return nil, err } if err = r.attach(ctx); err != nil { return nil, err } go r.mux(hooks) return r, nil } // NewSender opens a new sender link on the session. // - ctx controls waiting for the peer to create a receiver terminus // - target is the name of the peer's receiver terminus // - opts contains optional values, pass nil to accept the defaults // // If the context's deadline expires or is cancelled before the operation // completes, an error is returned. If the Sender was successfully // created, it will be cleaned up in future calls to NewSender. func (s *Session) NewSender(ctx context.Context, target string, opts *SenderOptions) (*Sender, error) { return newSenderForSession(ctx, s, target, opts, senderTestHooks{}) } // Properties returns the peer's session properties. // Returns nil if the peer didn't send any properties. func (s *Session) Properties() map[string]any { return s.peerProperties } // split out so tests can add hooks func newSenderForSession(ctx context.Context, s *Session, target string, opts *SenderOptions, hooks senderTestHooks) (*Sender, error) { l, err := newSender(target, s, opts) if err != nil { return nil, err } if err = l.attach(ctx); err != nil { return nil, err } go l.mux(hooks) return l, nil } func (s *Session) mux(remoteBegin *frames.PerformBegin) { defer func() { if s.doneErr == nil { s.doneErr = &SessionError{} } else if connErr := (&ConnError{}); !errors.As(s.doneErr, &connErr) { // only wrap non-ConnError error types var amqpErr *Error if errors.As(s.doneErr, &amqpErr) { s.doneErr = &SessionError{RemoteErr: amqpErr} } else { s.doneErr = &SessionError{inner: s.doneErr} } } // Signal goroutines waiting on the session. close(s.done) }() var ( // maps input (remote) handles to links linkFromInputHandle = make(map[uint32]*link) // maps local delivery IDs (sending transfers) to input (remote) handles inputHandleFromDeliveryID = make(map[uint32]uint32) // maps remote delivery IDs (receiving transfers) to input (remote) handles inputHandleFromRemoteDeliveryID = make(map[uint32]uint32) // maps delivery IDs to output (our) handles. used for multi-frame transfers deliveryIDFromOutputHandle = make(map[uint32]uint32) // maps delivery IDs to the settlement state channel settlementFromDeliveryID = make(map[uint32]chan encoding.DeliveryState) // tracks the next delivery ID for outgoing transfers nextDeliveryID uint32 // flow control values nextOutgoingID uint32 nextIncomingID = remoteBegin.NextOutgoingID remoteIncomingWindow = remoteBegin.IncomingWindow remoteOutgoingWindow = remoteBegin.OutgoingWindow closeInProgress bool // indicates the end performative has been sent ) closeWithError := func(e1 *Error, e2 error) { if closeInProgress { debug.Log(3, "TX (Session %p): close already pending, discarding %v", s, e1) return } closeInProgress = true s.doneErr = e2 s.txFrame(&frameContext{Ctx: context.Background()}, &frames.PerformEnd{Error: e1}) close(s.endSent) } for { txTransfer := s.txTransfer // disable txTransfer if flow control windows have been exceeded if remoteIncomingWindow == 0 || s.outgoingWindow == 0 { debug.Log(1, "TX (Session %p): disabling txTransfer - window exceeded. remoteIncomingWindow: %d outgoingWindow: %d", s, remoteIncomingWindow, s.outgoingWindow) txTransfer = nil } tx := s.tx closed := s.close if closeInProgress { // swap out channel so it no longer triggers closed = nil // once the end performative is sent, we're not allowed to send any frames tx = nil txTransfer = nil } // notes on client-side closing session // when session is closed, we must keep the mux running until the ack'ing end performative // has been received. during this window, the session is allowed to receive frames but cannot // send them. // client-side close happens either by user calling Session.Close() or due to mux initiated // close due to a violation of some invariant (see sending &Error{} to s.close). in the case // that both code paths have been triggered, we must be careful to preserve the error that // triggered the mux initiated close so it can be surfaced to the caller. select { // conn has completed, exit case <-s.conn.done: s.doneErr = s.conn.doneErr return case <-closed: if closeInProgress { // a client-side close due to protocol error is in progress continue } // session is being closed by the client closeInProgress = true s.txFrame(&frameContext{Ctx: context.Background()}, &frames.PerformEnd{}) close(s.endSent) // incoming frame case q := <-s.rxQ.Wait(): fr := *q.Dequeue() s.rxQ.Release(q) debug.Log(2, "RX (Session %p): %s", s, fr) switch body := fr.(type) { // Disposition frames can reference transfers from more than one // link. Send this frame to all of them. case *frames.PerformDisposition: start := body.First end := start if body.Last != nil { end = *body.Last } for deliveryID := start; deliveryID <= end; deliveryID++ { // find the input (remote) handle for this delivery ID. // default to the map for local delivery IDs. handles := inputHandleFromDeliveryID if body.Role == encoding.RoleSender { // the disposition frame is meant for a receiver // so look in the map for remote delivery IDs. handles = inputHandleFromRemoteDeliveryID } inputHandle, ok := handles[deliveryID] if !ok { debug.Log(2, "RX (Session %p): role %s: didn't find deliveryID %d in inputHandlesByDeliveryID map", s, body.Role, deliveryID) continue } delete(handles, deliveryID) if body.Settled && body.Role == encoding.RoleReceiver { // check if settlement confirmation was requested, if so // confirm by closing channel (RSM == ModeFirst) if done, ok := settlementFromDeliveryID[deliveryID]; ok { delete(settlementFromDeliveryID, deliveryID) select { case done <- body.State: default: } close(done) } } // now find the *link for this input (remote) handle link, ok := linkFromInputHandle[inputHandle] if !ok { closeWithError(&Error{ Condition: ErrCondUnattachedHandle, Description: "received disposition frame referencing a handle that's not in use", }, fmt.Errorf("received disposition frame with unknown link input handle %d", inputHandle)) continue } s.muxFrameToLink(link, fr) } continue case *frames.PerformFlow: if body.NextIncomingID == nil { // This is a protocol error: // "[...] MUST be set if the peer has received // the begin frame for the session" closeWithError(&Error{ Condition: ErrCondNotAllowed, Description: "next-incoming-id not set after session established", }, errors.New("protocol error: received flow without next-incoming-id after session established")) continue } // "When the endpoint receives a flow frame from its peer, // it MUST update the next-incoming-id directly from the // next-outgoing-id of the frame, and it MUST update the // remote-outgoing-window directly from the outgoing-window // of the frame." nextIncomingID = body.NextOutgoingID remoteOutgoingWindow = body.OutgoingWindow // "The remote-incoming-window is computed as follows: // // next-incoming-id(flow) + incoming-window(flow) - next-outgoing-id(endpoint) // // If the next-incoming-id field of the flow frame is not set, then remote-incoming-window is computed as follows: // // initial-outgoing-id(endpoint) + incoming-window(flow) - next-outgoing-id(endpoint)" remoteIncomingWindow = body.IncomingWindow - nextOutgoingID remoteIncomingWindow += *body.NextIncomingID debug.Log(3, "RX (Session %p): flow - remoteOutgoingWindow: %d remoteIncomingWindow: %d nextOutgoingID: %d", s, remoteOutgoingWindow, remoteIncomingWindow, nextOutgoingID) // Send to link if handle is set if body.Handle != nil { link, ok := linkFromInputHandle[*body.Handle] if !ok { closeWithError(&Error{ Condition: ErrCondUnattachedHandle, Description: "received flow frame referencing a handle that's not in use", }, fmt.Errorf("received flow frame with unknown link handle %d", body.Handle)) continue } s.muxFrameToLink(link, fr) continue } if body.Echo && !closeInProgress { niID := nextIncomingID resp := &frames.PerformFlow{ NextIncomingID: &niID, IncomingWindow: s.incomingWindow, NextOutgoingID: nextOutgoingID, OutgoingWindow: s.outgoingWindow, } s.txFrame(&frameContext{Ctx: context.Background()}, resp) } case *frames.PerformAttach: // On Attach response link should be looked up by name, then added // to the links map with the remote's handle contained in this // attach frame. // // Note body.Role is the remote peer's role, we reverse for the local key. s.linksMu.RLock() link, linkOk := s.linksByKey[linkKey{name: body.Name, role: !body.Role}] s.linksMu.RUnlock() if !linkOk { closeWithError(&Error{ Condition: ErrCondNotAllowed, Description: "received mismatched attach frame", }, fmt.Errorf("protocol error: received mismatched attach frame %+v", body)) continue } // track the input (remote) handle number for this link. // note that it might be a different value than our output handle. link.inputHandle = body.Handle linkFromInputHandle[link.inputHandle] = link s.muxFrameToLink(link, fr) debug.Log(1, "RX (Session %p): link %s attached, input handle %d, output handle %d", s, link.key.name, link.inputHandle, link.outputHandle) case *frames.PerformTransfer: s.needFlowCount++ // "Upon receiving a transfer, the receiving endpoint will // increment the next-incoming-id to match the implicit // transfer-id of the incoming transfer plus one, as well // as decrementing the remote-outgoing-window, and MAY // (depending on policy) decrement its incoming-window." nextIncomingID++ // don't loop to intmax if remoteOutgoingWindow > 0 { remoteOutgoingWindow-- } link, ok := linkFromInputHandle[body.Handle] if !ok { closeWithError(&Error{ Condition: ErrCondUnattachedHandle, Description: "received transfer frame referencing a handle that's not in use", }, fmt.Errorf("received transfer frame with unknown link handle %d", body.Handle)) continue } s.muxFrameToLink(link, fr) // if this message is received unsettled and link rcv-settle-mode == second, add to handlesByRemoteDeliveryID if !body.Settled && body.DeliveryID != nil && link.receiverSettleMode != nil && *link.receiverSettleMode == ReceiverSettleModeSecond { debug.Log(1, "RX (Session %p): adding handle %d to inputHandleFromRemoteDeliveryID. remote delivery ID: %d", s, body.Handle, *body.DeliveryID) inputHandleFromRemoteDeliveryID[*body.DeliveryID] = body.Handle } // Update peer's outgoing window if half has been consumed. if s.needFlowCount >= s.incomingWindow/2 && !closeInProgress { debug.Log(3, "RX (Session %p): channel %d: flow - s.needFlowCount(%d) >= s.incomingWindow(%d)/2\n", s, s.channel, s.needFlowCount, s.incomingWindow) s.needFlowCount = 0 nID := nextIncomingID flow := &frames.PerformFlow{ NextIncomingID: &nID, IncomingWindow: s.incomingWindow, NextOutgoingID: nextOutgoingID, OutgoingWindow: s.outgoingWindow, } s.txFrame(&frameContext{Ctx: context.Background()}, flow) } case *frames.PerformDetach: link, ok := linkFromInputHandle[body.Handle] if !ok { closeWithError(&Error{ Condition: ErrCondUnattachedHandle, Description: "received detach frame referencing a handle that's not in use", }, fmt.Errorf("received detach frame with unknown link handle %d", body.Handle)) continue } s.muxFrameToLink(link, fr) // we received a detach frame and sent it to the link. // this was either the response to a client-side initiated // detach or our peer detached us. either way, now that // the link has processed the frame it's detached so we // are safe to clean up its state. delete(linkFromInputHandle, link.inputHandle) delete(deliveryIDFromOutputHandle, link.outputHandle) s.deallocateHandle(link) case *frames.PerformEnd: // there are two possibilities: // - this is the ack to a client-side Close() // - the peer is ending the session so we must ack if closeInProgress { return } // peer detached us with an error, save it and send the ack if body.Error != nil { s.doneErr = body.Error } fr := frames.PerformEnd{} s.txFrame(&frameContext{Ctx: context.Background()}, &fr) // per spec, when end is received, we're no longer allowed to receive frames return default: debug.Log(1, "RX (Session %p): unexpected frame: %s\n", s, body) closeWithError(&Error{ Condition: ErrCondInternalError, Description: "session received unexpected frame", }, fmt.Errorf("internal error: unexpected frame %T", body)) } case env := <-txTransfer: fr := &env.Frame // record current delivery ID var deliveryID uint32 if fr.DeliveryID == &needsDeliveryID { deliveryID = nextDeliveryID fr.DeliveryID = &deliveryID nextDeliveryID++ deliveryIDFromOutputHandle[fr.Handle] = deliveryID if !fr.Settled { inputHandleFromDeliveryID[deliveryID] = env.InputHandle } } else { // if fr.DeliveryID is nil it must have been added // to deliveryIDByHandle already (multi-frame transfer) deliveryID = deliveryIDFromOutputHandle[fr.Handle] } // log after the delivery ID has been assigned debug.Log(2, "TX (Session %p): %d, %s", s, s.channel, fr) // frame has been sender-settled, remove from map. // this should only come into play for multi-frame transfers. if fr.Settled { delete(inputHandleFromDeliveryID, deliveryID) } s.txFrame(env.FrameCtx, fr) select { case <-env.FrameCtx.Done: if env.FrameCtx.Err != nil { // transfer wasn't sent, don't update state continue } // transfer was written to the network case <-s.conn.done: // the write failed, Conn is going down continue } // if not settled, add done chan to map if !fr.Settled && fr.Done != nil { settlementFromDeliveryID[deliveryID] = fr.Done } else if fr.Done != nil { // sender-settled, close done now that the transfer has been sent close(fr.Done) } // "Upon sending a transfer, the sending endpoint will increment // its next-outgoing-id, decrement its remote-incoming-window, // and MAY (depending on policy) decrement its outgoing-window." nextOutgoingID++ // don't decrement if we're at 0 or we could loop to int max if remoteIncomingWindow != 0 { remoteIncomingWindow-- } case env := <-tx: fr := env.FrameBody debug.Log(2, "TX (Session %p): %d, %s", s, s.channel, fr) switch fr := env.FrameBody.(type) { case *frames.PerformDisposition: if fr.Settled && fr.Role == encoding.RoleSender { // sender with a peer that's in mode second; sending confirmation of disposition. // disposition frames can reference a range of delivery IDs, although it's highly // likely in this case there will only be one. start := fr.First end := start if fr.Last != nil { end = *fr.Last } for deliveryID := start; deliveryID <= end; deliveryID++ { // send delivery state to the channel and close it to signal // that the delivery has completed (RSM == ModeSecond) if done, ok := settlementFromDeliveryID[deliveryID]; ok { delete(settlementFromDeliveryID, deliveryID) select { case done <- fr.State: default: } close(done) } } } s.txFrame(env.FrameCtx, fr) case *frames.PerformFlow: niID := nextIncomingID fr.NextIncomingID = &niID fr.IncomingWindow = s.incomingWindow fr.NextOutgoingID = nextOutgoingID fr.OutgoingWindow = s.outgoingWindow s.txFrame(env.FrameCtx, fr) case *frames.PerformTransfer: panic("transfer frames must use txTransfer") default: s.txFrame(env.FrameCtx, fr) } } } } func (s *Session) allocateHandle(ctx context.Context, l *link) error { s.linksMu.Lock() defer s.linksMu.Unlock() // Check if link name already exists, if so then an error should be returned existing := s.linksByKey[l.key] if existing != nil { return fmt.Errorf("link with name '%v' already exists", l.key.name) } next, ok := s.outputHandles.Next() if !ok { if err := s.Close(ctx); err != nil { return err } // handle numbers are zero-based, report the actual count return &SessionError{inner: fmt.Errorf("reached session handle max (%d)", s.handleMax+1)} } l.outputHandle = next // allocate handle to the link s.linksByKey[l.key] = l // add to mapping return nil } func (s *Session) deallocateHandle(l *link) { s.linksMu.Lock() defer s.linksMu.Unlock() delete(s.linksByKey, l.key) s.outputHandles.Remove(l.outputHandle) } func (s *Session) abandonLink(l *link) { s.abandonedLinksMu.Lock() defer s.abandonedLinksMu.Unlock() s.abandonedLinks = append(s.abandonedLinks, l) } func (s *Session) freeAbandonedLinks(ctx context.Context) error { s.abandonedLinksMu.Lock() defer s.abandonedLinksMu.Unlock() debug.Log(3, "TX (Session %p): cleaning up %d abandoned links", s, len(s.abandonedLinks)) for _, l := range s.abandonedLinks { dr := &frames.PerformDetach{ Handle: l.outputHandle, Closed: true, } if err := s.txFrameAndWait(ctx, dr); err != nil { return err } } s.abandonedLinks = nil return nil } func (s *Session) muxFrameToLink(l *link, fr frames.FrameBody) { q := l.rxQ.Acquire() q.Enqueue(fr) l.rxQ.Release(q) debug.Log(2, "RX (Session %p): mux frame to link (%p): %s, %s", s, l, l.key.name, fr) } // transferEnvelope is used by senders to send transfer frames type transferEnvelope struct { FrameCtx *frameContext // the link's remote handle InputHandle uint32 Frame frames.PerformTransfer } // frameBodyEnvelope is used by senders and receivers to send frames. type frameBodyEnvelope struct { FrameCtx *frameContext FrameBody frames.FrameBody } // the address of this var is a sentinel value indicating // that a transfer frame is in need of a delivery ID var needsDeliveryID uint32