in session.go [317:752]
func (s *Session) mux(remoteBegin *frames.PerformBegin) {
defer func() {
if s.doneErr == nil {
s.doneErr = &SessionError{}
} else if connErr := (&ConnError{}); !errors.As(s.doneErr, &connErr) {
// only wrap non-ConnError error types
var amqpErr *Error
if errors.As(s.doneErr, &amqpErr) {
s.doneErr = &SessionError{RemoteErr: amqpErr}
} else {
s.doneErr = &SessionError{inner: s.doneErr}
}
}
// Signal goroutines waiting on the session.
close(s.done)
}()
var (
// maps input (remote) handles to links
linkFromInputHandle = make(map[uint32]*link)
// maps local delivery IDs (sending transfers) to input (remote) handles
inputHandleFromDeliveryID = make(map[uint32]uint32)
// maps remote delivery IDs (receiving transfers) to input (remote) handles
inputHandleFromRemoteDeliveryID = make(map[uint32]uint32)
// maps delivery IDs to output (our) handles. used for multi-frame transfers
deliveryIDFromOutputHandle = make(map[uint32]uint32)
// maps delivery IDs to the settlement state channel
settlementFromDeliveryID = make(map[uint32]chan encoding.DeliveryState)
// tracks the next delivery ID for outgoing transfers
nextDeliveryID uint32
// flow control values
nextOutgoingID uint32
nextIncomingID = remoteBegin.NextOutgoingID
remoteIncomingWindow = remoteBegin.IncomingWindow
remoteOutgoingWindow = remoteBegin.OutgoingWindow
closeInProgress bool // indicates the end performative has been sent
)
closeWithError := func(e1 *Error, e2 error) {
if closeInProgress {
debug.Log(3, "TX (Session %p): close already pending, discarding %v", s, e1)
return
}
closeInProgress = true
s.doneErr = e2
s.txFrame(&frameContext{Ctx: context.Background()}, &frames.PerformEnd{Error: e1})
close(s.endSent)
}
for {
txTransfer := s.txTransfer
// disable txTransfer if flow control windows have been exceeded
if remoteIncomingWindow == 0 || s.outgoingWindow == 0 {
debug.Log(1, "TX (Session %p): disabling txTransfer - window exceeded. remoteIncomingWindow: %d outgoingWindow: %d",
s, remoteIncomingWindow, s.outgoingWindow)
txTransfer = nil
}
tx := s.tx
closed := s.close
if closeInProgress {
// swap out channel so it no longer triggers
closed = nil
// once the end performative is sent, we're not allowed to send any frames
tx = nil
txTransfer = nil
}
// notes on client-side closing session
// when session is closed, we must keep the mux running until the ack'ing end performative
// has been received. during this window, the session is allowed to receive frames but cannot
// send them.
// client-side close happens either by user calling Session.Close() or due to mux initiated
// close due to a violation of some invariant (see sending &Error{} to s.close). in the case
// that both code paths have been triggered, we must be careful to preserve the error that
// triggered the mux initiated close so it can be surfaced to the caller.
select {
// conn has completed, exit
case <-s.conn.done:
s.doneErr = s.conn.doneErr
return
case <-closed:
if closeInProgress {
// a client-side close due to protocol error is in progress
continue
}
// session is being closed by the client
closeInProgress = true
s.txFrame(&frameContext{Ctx: context.Background()}, &frames.PerformEnd{})
close(s.endSent)
// incoming frame
case q := <-s.rxQ.Wait():
fr := *q.Dequeue()
s.rxQ.Release(q)
debug.Log(2, "RX (Session %p): %s", s, fr)
switch body := fr.(type) {
// Disposition frames can reference transfers from more than one
// link. Send this frame to all of them.
case *frames.PerformDisposition:
start := body.First
end := start
if body.Last != nil {
end = *body.Last
}
for deliveryID := start; deliveryID <= end; deliveryID++ {
// find the input (remote) handle for this delivery ID.
// default to the map for local delivery IDs.
handles := inputHandleFromDeliveryID
if body.Role == encoding.RoleSender {
// the disposition frame is meant for a receiver
// so look in the map for remote delivery IDs.
handles = inputHandleFromRemoteDeliveryID
}
inputHandle, ok := handles[deliveryID]
if !ok {
debug.Log(2, "RX (Session %p): role %s: didn't find deliveryID %d in inputHandlesByDeliveryID map", s, body.Role, deliveryID)
continue
}
delete(handles, deliveryID)
if body.Settled && body.Role == encoding.RoleReceiver {
// check if settlement confirmation was requested, if so
// confirm by closing channel (RSM == ModeFirst)
if done, ok := settlementFromDeliveryID[deliveryID]; ok {
delete(settlementFromDeliveryID, deliveryID)
select {
case done <- body.State:
default:
}
close(done)
}
}
// now find the *link for this input (remote) handle
link, ok := linkFromInputHandle[inputHandle]
if !ok {
closeWithError(&Error{
Condition: ErrCondUnattachedHandle,
Description: "received disposition frame referencing a handle that's not in use",
}, fmt.Errorf("received disposition frame with unknown link input handle %d", inputHandle))
continue
}
s.muxFrameToLink(link, fr)
}
continue
case *frames.PerformFlow:
if body.NextIncomingID == nil {
// This is a protocol error:
// "[...] MUST be set if the peer has received
// the begin frame for the session"
closeWithError(&Error{
Condition: ErrCondNotAllowed,
Description: "next-incoming-id not set after session established",
}, errors.New("protocol error: received flow without next-incoming-id after session established"))
continue
}
// "When the endpoint receives a flow frame from its peer,
// it MUST update the next-incoming-id directly from the
// next-outgoing-id of the frame, and it MUST update the
// remote-outgoing-window directly from the outgoing-window
// of the frame."
nextIncomingID = body.NextOutgoingID
remoteOutgoingWindow = body.OutgoingWindow
// "The remote-incoming-window is computed as follows:
//
// next-incoming-id(flow) + incoming-window(flow) - next-outgoing-id(endpoint)
//
// If the next-incoming-id field of the flow frame is not set, then remote-incoming-window is computed as follows:
//
// initial-outgoing-id(endpoint) + incoming-window(flow) - next-outgoing-id(endpoint)"
remoteIncomingWindow = body.IncomingWindow - nextOutgoingID
remoteIncomingWindow += *body.NextIncomingID
debug.Log(3, "RX (Session %p): flow - remoteOutgoingWindow: %d remoteIncomingWindow: %d nextOutgoingID: %d", s, remoteOutgoingWindow, remoteIncomingWindow, nextOutgoingID)
// Send to link if handle is set
if body.Handle != nil {
link, ok := linkFromInputHandle[*body.Handle]
if !ok {
closeWithError(&Error{
Condition: ErrCondUnattachedHandle,
Description: "received flow frame referencing a handle that's not in use",
}, fmt.Errorf("received flow frame with unknown link handle %d", body.Handle))
continue
}
s.muxFrameToLink(link, fr)
continue
}
if body.Echo && !closeInProgress {
niID := nextIncomingID
resp := &frames.PerformFlow{
NextIncomingID: &niID,
IncomingWindow: s.incomingWindow,
NextOutgoingID: nextOutgoingID,
OutgoingWindow: s.outgoingWindow,
}
s.txFrame(&frameContext{Ctx: context.Background()}, resp)
}
case *frames.PerformAttach:
// On Attach response link should be looked up by name, then added
// to the links map with the remote's handle contained in this
// attach frame.
//
// Note body.Role is the remote peer's role, we reverse for the local key.
s.linksMu.RLock()
link, linkOk := s.linksByKey[linkKey{name: body.Name, role: !body.Role}]
s.linksMu.RUnlock()
if !linkOk {
closeWithError(&Error{
Condition: ErrCondNotAllowed,
Description: "received mismatched attach frame",
}, fmt.Errorf("protocol error: received mismatched attach frame %+v", body))
continue
}
// track the input (remote) handle number for this link.
// note that it might be a different value than our output handle.
link.inputHandle = body.Handle
linkFromInputHandle[link.inputHandle] = link
s.muxFrameToLink(link, fr)
debug.Log(1, "RX (Session %p): link %s attached, input handle %d, output handle %d", s, link.key.name, link.inputHandle, link.outputHandle)
case *frames.PerformTransfer:
s.needFlowCount++
// "Upon receiving a transfer, the receiving endpoint will
// increment the next-incoming-id to match the implicit
// transfer-id of the incoming transfer plus one, as well
// as decrementing the remote-outgoing-window, and MAY
// (depending on policy) decrement its incoming-window."
nextIncomingID++
// don't loop to intmax
if remoteOutgoingWindow > 0 {
remoteOutgoingWindow--
}
link, ok := linkFromInputHandle[body.Handle]
if !ok {
closeWithError(&Error{
Condition: ErrCondUnattachedHandle,
Description: "received transfer frame referencing a handle that's not in use",
}, fmt.Errorf("received transfer frame with unknown link handle %d", body.Handle))
continue
}
s.muxFrameToLink(link, fr)
// if this message is received unsettled and link rcv-settle-mode == second, add to handlesByRemoteDeliveryID
if !body.Settled && body.DeliveryID != nil && link.receiverSettleMode != nil && *link.receiverSettleMode == ReceiverSettleModeSecond {
debug.Log(1, "RX (Session %p): adding handle %d to inputHandleFromRemoteDeliveryID. remote delivery ID: %d", s, body.Handle, *body.DeliveryID)
inputHandleFromRemoteDeliveryID[*body.DeliveryID] = body.Handle
}
// Update peer's outgoing window if half has been consumed.
if s.needFlowCount >= s.incomingWindow/2 && !closeInProgress {
debug.Log(3, "RX (Session %p): channel %d: flow - s.needFlowCount(%d) >= s.incomingWindow(%d)/2\n", s, s.channel, s.needFlowCount, s.incomingWindow)
s.needFlowCount = 0
nID := nextIncomingID
flow := &frames.PerformFlow{
NextIncomingID: &nID,
IncomingWindow: s.incomingWindow,
NextOutgoingID: nextOutgoingID,
OutgoingWindow: s.outgoingWindow,
}
s.txFrame(&frameContext{Ctx: context.Background()}, flow)
}
case *frames.PerformDetach:
link, ok := linkFromInputHandle[body.Handle]
if !ok {
closeWithError(&Error{
Condition: ErrCondUnattachedHandle,
Description: "received detach frame referencing a handle that's not in use",
}, fmt.Errorf("received detach frame with unknown link handle %d", body.Handle))
continue
}
s.muxFrameToLink(link, fr)
// we received a detach frame and sent it to the link.
// this was either the response to a client-side initiated
// detach or our peer detached us. either way, now that
// the link has processed the frame it's detached so we
// are safe to clean up its state.
delete(linkFromInputHandle, link.inputHandle)
delete(deliveryIDFromOutputHandle, link.outputHandle)
s.deallocateHandle(link)
case *frames.PerformEnd:
// there are two possibilities:
// - this is the ack to a client-side Close()
// - the peer is ending the session so we must ack
if closeInProgress {
return
}
// peer detached us with an error, save it and send the ack
if body.Error != nil {
s.doneErr = body.Error
}
fr := frames.PerformEnd{}
s.txFrame(&frameContext{Ctx: context.Background()}, &fr)
// per spec, when end is received, we're no longer allowed to receive frames
return
default:
debug.Log(1, "RX (Session %p): unexpected frame: %s\n", s, body)
closeWithError(&Error{
Condition: ErrCondInternalError,
Description: "session received unexpected frame",
}, fmt.Errorf("internal error: unexpected frame %T", body))
}
case env := <-txTransfer:
fr := &env.Frame
// record current delivery ID
var deliveryID uint32
if fr.DeliveryID == &needsDeliveryID {
deliveryID = nextDeliveryID
fr.DeliveryID = &deliveryID
nextDeliveryID++
deliveryIDFromOutputHandle[fr.Handle] = deliveryID
if !fr.Settled {
inputHandleFromDeliveryID[deliveryID] = env.InputHandle
}
} else {
// if fr.DeliveryID is nil it must have been added
// to deliveryIDByHandle already (multi-frame transfer)
deliveryID = deliveryIDFromOutputHandle[fr.Handle]
}
// log after the delivery ID has been assigned
debug.Log(2, "TX (Session %p): %d, %s", s, s.channel, fr)
// frame has been sender-settled, remove from map.
// this should only come into play for multi-frame transfers.
if fr.Settled {
delete(inputHandleFromDeliveryID, deliveryID)
}
s.txFrame(env.FrameCtx, fr)
select {
case <-env.FrameCtx.Done:
if env.FrameCtx.Err != nil {
// transfer wasn't sent, don't update state
continue
}
// transfer was written to the network
case <-s.conn.done:
// the write failed, Conn is going down
continue
}
// if not settled, add done chan to map
if !fr.Settled && fr.Done != nil {
settlementFromDeliveryID[deliveryID] = fr.Done
} else if fr.Done != nil {
// sender-settled, close done now that the transfer has been sent
close(fr.Done)
}
// "Upon sending a transfer, the sending endpoint will increment
// its next-outgoing-id, decrement its remote-incoming-window,
// and MAY (depending on policy) decrement its outgoing-window."
nextOutgoingID++
// don't decrement if we're at 0 or we could loop to int max
if remoteIncomingWindow != 0 {
remoteIncomingWindow--
}
case env := <-tx:
fr := env.FrameBody
debug.Log(2, "TX (Session %p): %d, %s", s, s.channel, fr)
switch fr := env.FrameBody.(type) {
case *frames.PerformDisposition:
if fr.Settled && fr.Role == encoding.RoleSender {
// sender with a peer that's in mode second; sending confirmation of disposition.
// disposition frames can reference a range of delivery IDs, although it's highly
// likely in this case there will only be one.
start := fr.First
end := start
if fr.Last != nil {
end = *fr.Last
}
for deliveryID := start; deliveryID <= end; deliveryID++ {
// send delivery state to the channel and close it to signal
// that the delivery has completed (RSM == ModeSecond)
if done, ok := settlementFromDeliveryID[deliveryID]; ok {
delete(settlementFromDeliveryID, deliveryID)
select {
case done <- fr.State:
default:
}
close(done)
}
}
}
s.txFrame(env.FrameCtx, fr)
case *frames.PerformFlow:
niID := nextIncomingID
fr.NextIncomingID = &niID
fr.IncomingWindow = s.incomingWindow
fr.NextOutgoingID = nextOutgoingID
fr.OutgoingWindow = s.outgoingWindow
s.txFrame(env.FrameCtx, fr)
case *frames.PerformTransfer:
panic("transfer frames must use txTransfer")
default:
s.txFrame(env.FrameCtx, fr)
}
}
}
}