func()

in session.go [317:752]


func (s *Session) mux(remoteBegin *frames.PerformBegin) {
	defer func() {
		if s.doneErr == nil {
			s.doneErr = &SessionError{}
		} else if connErr := (&ConnError{}); !errors.As(s.doneErr, &connErr) {
			// only wrap non-ConnError error types
			var amqpErr *Error
			if errors.As(s.doneErr, &amqpErr) {
				s.doneErr = &SessionError{RemoteErr: amqpErr}
			} else {
				s.doneErr = &SessionError{inner: s.doneErr}
			}
		}
		// Signal goroutines waiting on the session.
		close(s.done)
	}()

	var (
		// maps input (remote) handles to links
		linkFromInputHandle = make(map[uint32]*link)

		// maps local delivery IDs (sending transfers) to input (remote) handles
		inputHandleFromDeliveryID = make(map[uint32]uint32)

		// maps remote delivery IDs (receiving transfers) to input (remote) handles
		inputHandleFromRemoteDeliveryID = make(map[uint32]uint32)

		// maps delivery IDs to output (our) handles. used for multi-frame transfers
		deliveryIDFromOutputHandle = make(map[uint32]uint32)

		// maps delivery IDs to the settlement state channel
		settlementFromDeliveryID = make(map[uint32]chan encoding.DeliveryState)

		// tracks the next delivery ID for outgoing transfers
		nextDeliveryID uint32

		// flow control values
		nextOutgoingID       uint32
		nextIncomingID       = remoteBegin.NextOutgoingID
		remoteIncomingWindow = remoteBegin.IncomingWindow
		remoteOutgoingWindow = remoteBegin.OutgoingWindow

		closeInProgress bool // indicates the end performative has been sent
	)

	closeWithError := func(e1 *Error, e2 error) {
		if closeInProgress {
			debug.Log(3, "TX (Session %p): close already pending, discarding %v", s, e1)
			return
		}

		closeInProgress = true
		s.doneErr = e2
		s.txFrame(&frameContext{Ctx: context.Background()}, &frames.PerformEnd{Error: e1})
		close(s.endSent)
	}

	for {
		txTransfer := s.txTransfer
		// disable txTransfer if flow control windows have been exceeded
		if remoteIncomingWindow == 0 || s.outgoingWindow == 0 {
			debug.Log(1, "TX (Session %p): disabling txTransfer - window exceeded. remoteIncomingWindow: %d outgoingWindow: %d",
				s, remoteIncomingWindow, s.outgoingWindow)
			txTransfer = nil
		}

		tx := s.tx
		closed := s.close
		if closeInProgress {
			// swap out channel so it no longer triggers
			closed = nil

			// once the end performative is sent, we're not allowed to send any frames
			tx = nil
			txTransfer = nil
		}

		// notes on client-side closing session
		// when session is closed, we must keep the mux running until the ack'ing end performative
		// has been received. during this window, the session is allowed to receive frames but cannot
		// send them.
		// client-side close happens either by user calling Session.Close() or due to mux initiated
		// close due to a violation of some invariant (see sending &Error{} to s.close). in the case
		// that both code paths have been triggered, we must be careful to preserve the error that
		// triggered the mux initiated close so it can be surfaced to the caller.

		select {
		// conn has completed, exit
		case <-s.conn.done:
			s.doneErr = s.conn.doneErr
			return

		case <-closed:
			if closeInProgress {
				// a client-side close due to protocol error is in progress
				continue
			}
			// session is being closed by the client
			closeInProgress = true
			s.txFrame(&frameContext{Ctx: context.Background()}, &frames.PerformEnd{})
			close(s.endSent)

		// incoming frame
		case q := <-s.rxQ.Wait():
			fr := *q.Dequeue()
			s.rxQ.Release(q)
			debug.Log(2, "RX (Session %p): %s", s, fr)

			switch body := fr.(type) {
			// Disposition frames can reference transfers from more than one
			// link. Send this frame to all of them.
			case *frames.PerformDisposition:
				start := body.First
				end := start
				if body.Last != nil {
					end = *body.Last
				}
				for deliveryID := start; deliveryID <= end; deliveryID++ {
					// find the input (remote) handle for this delivery ID.
					// default to the map for local delivery IDs.
					handles := inputHandleFromDeliveryID
					if body.Role == encoding.RoleSender {
						// the disposition frame is meant for a receiver
						// so look in the map for remote delivery IDs.
						handles = inputHandleFromRemoteDeliveryID
					}

					inputHandle, ok := handles[deliveryID]
					if !ok {
						debug.Log(2, "RX (Session %p): role %s: didn't find deliveryID %d in inputHandlesByDeliveryID map", s, body.Role, deliveryID)
						continue
					}
					delete(handles, deliveryID)

					if body.Settled && body.Role == encoding.RoleReceiver {
						// check if settlement confirmation was requested, if so
						// confirm by closing channel (RSM == ModeFirst)
						if done, ok := settlementFromDeliveryID[deliveryID]; ok {
							delete(settlementFromDeliveryID, deliveryID)
							select {
							case done <- body.State:
							default:
							}
							close(done)
						}
					}

					// now find the *link for this input (remote) handle
					link, ok := linkFromInputHandle[inputHandle]
					if !ok {
						closeWithError(&Error{
							Condition:   ErrCondUnattachedHandle,
							Description: "received disposition frame referencing a handle that's not in use",
						}, fmt.Errorf("received disposition frame with unknown link input handle %d", inputHandle))
						continue
					}

					s.muxFrameToLink(link, fr)
				}
				continue
			case *frames.PerformFlow:
				if body.NextIncomingID == nil {
					// This is a protocol error:
					//       "[...] MUST be set if the peer has received
					//        the begin frame for the session"
					closeWithError(&Error{
						Condition:   ErrCondNotAllowed,
						Description: "next-incoming-id not set after session established",
					}, errors.New("protocol error: received flow without next-incoming-id after session established"))
					continue
				}

				// "When the endpoint receives a flow frame from its peer,
				// it MUST update the next-incoming-id directly from the
				// next-outgoing-id of the frame, and it MUST update the
				// remote-outgoing-window directly from the outgoing-window
				// of the frame."
				nextIncomingID = body.NextOutgoingID
				remoteOutgoingWindow = body.OutgoingWindow

				// "The remote-incoming-window is computed as follows:
				//
				// next-incoming-id(flow) + incoming-window(flow) - next-outgoing-id(endpoint)
				//
				// If the next-incoming-id field of the flow frame is not set, then remote-incoming-window is computed as follows:
				//
				// initial-outgoing-id(endpoint) + incoming-window(flow) - next-outgoing-id(endpoint)"
				remoteIncomingWindow = body.IncomingWindow - nextOutgoingID
				remoteIncomingWindow += *body.NextIncomingID
				debug.Log(3, "RX (Session %p): flow - remoteOutgoingWindow: %d remoteIncomingWindow: %d nextOutgoingID: %d", s, remoteOutgoingWindow, remoteIncomingWindow, nextOutgoingID)

				// Send to link if handle is set
				if body.Handle != nil {
					link, ok := linkFromInputHandle[*body.Handle]
					if !ok {
						closeWithError(&Error{
							Condition:   ErrCondUnattachedHandle,
							Description: "received flow frame referencing a handle that's not in use",
						}, fmt.Errorf("received flow frame with unknown link handle %d", body.Handle))
						continue
					}

					s.muxFrameToLink(link, fr)
					continue
				}

				if body.Echo && !closeInProgress {
					niID := nextIncomingID
					resp := &frames.PerformFlow{
						NextIncomingID: &niID,
						IncomingWindow: s.incomingWindow,
						NextOutgoingID: nextOutgoingID,
						OutgoingWindow: s.outgoingWindow,
					}
					s.txFrame(&frameContext{Ctx: context.Background()}, resp)
				}

			case *frames.PerformAttach:
				// On Attach response link should be looked up by name, then added
				// to the links map with the remote's handle contained in this
				// attach frame.
				//
				// Note body.Role is the remote peer's role, we reverse for the local key.
				s.linksMu.RLock()
				link, linkOk := s.linksByKey[linkKey{name: body.Name, role: !body.Role}]
				s.linksMu.RUnlock()
				if !linkOk {
					closeWithError(&Error{
						Condition:   ErrCondNotAllowed,
						Description: "received mismatched attach frame",
					}, fmt.Errorf("protocol error: received mismatched attach frame %+v", body))
					continue
				}

				// track the input (remote) handle number for this link.
				// note that it might be a different value than our output handle.
				link.inputHandle = body.Handle
				linkFromInputHandle[link.inputHandle] = link

				s.muxFrameToLink(link, fr)

				debug.Log(1, "RX (Session %p): link %s attached, input handle %d, output handle %d", s, link.key.name, link.inputHandle, link.outputHandle)

			case *frames.PerformTransfer:
				s.needFlowCount++
				// "Upon receiving a transfer, the receiving endpoint will
				// increment the next-incoming-id to match the implicit
				// transfer-id of the incoming transfer plus one, as well
				// as decrementing the remote-outgoing-window, and MAY
				// (depending on policy) decrement its incoming-window."
				nextIncomingID++
				// don't loop to intmax
				if remoteOutgoingWindow > 0 {
					remoteOutgoingWindow--
				}
				link, ok := linkFromInputHandle[body.Handle]
				if !ok {
					closeWithError(&Error{
						Condition:   ErrCondUnattachedHandle,
						Description: "received transfer frame referencing a handle that's not in use",
					}, fmt.Errorf("received transfer frame with unknown link handle %d", body.Handle))
					continue
				}

				s.muxFrameToLink(link, fr)

				// if this message is received unsettled and link rcv-settle-mode == second, add to handlesByRemoteDeliveryID
				if !body.Settled && body.DeliveryID != nil && link.receiverSettleMode != nil && *link.receiverSettleMode == ReceiverSettleModeSecond {
					debug.Log(1, "RX (Session %p): adding handle %d to inputHandleFromRemoteDeliveryID. remote delivery ID: %d", s, body.Handle, *body.DeliveryID)
					inputHandleFromRemoteDeliveryID[*body.DeliveryID] = body.Handle
				}

				// Update peer's outgoing window if half has been consumed.
				if s.needFlowCount >= s.incomingWindow/2 && !closeInProgress {
					debug.Log(3, "RX (Session %p): channel %d: flow - s.needFlowCount(%d) >= s.incomingWindow(%d)/2\n", s, s.channel, s.needFlowCount, s.incomingWindow)
					s.needFlowCount = 0
					nID := nextIncomingID
					flow := &frames.PerformFlow{
						NextIncomingID: &nID,
						IncomingWindow: s.incomingWindow,
						NextOutgoingID: nextOutgoingID,
						OutgoingWindow: s.outgoingWindow,
					}
					s.txFrame(&frameContext{Ctx: context.Background()}, flow)
				}

			case *frames.PerformDetach:
				link, ok := linkFromInputHandle[body.Handle]
				if !ok {
					closeWithError(&Error{
						Condition:   ErrCondUnattachedHandle,
						Description: "received detach frame referencing a handle that's not in use",
					}, fmt.Errorf("received detach frame with unknown link handle %d", body.Handle))
					continue
				}
				s.muxFrameToLink(link, fr)

				// we received a detach frame and sent it to the link.
				// this was either the response to a client-side initiated
				// detach or our peer detached us. either way, now that
				// the link has processed the frame it's detached so we
				// are safe to clean up its state.
				delete(linkFromInputHandle, link.inputHandle)
				delete(deliveryIDFromOutputHandle, link.outputHandle)
				s.deallocateHandle(link)

			case *frames.PerformEnd:
				// there are two possibilities:
				// - this is the ack to a client-side Close()
				// - the peer is ending the session so we must ack

				if closeInProgress {
					return
				}

				// peer detached us with an error, save it and send the ack
				if body.Error != nil {
					s.doneErr = body.Error
				}

				fr := frames.PerformEnd{}
				s.txFrame(&frameContext{Ctx: context.Background()}, &fr)

				// per spec, when end is received, we're no longer allowed to receive frames
				return

			default:
				debug.Log(1, "RX (Session %p): unexpected frame: %s\n", s, body)
				closeWithError(&Error{
					Condition:   ErrCondInternalError,
					Description: "session received unexpected frame",
				}, fmt.Errorf("internal error: unexpected frame %T", body))
			}

		case env := <-txTransfer:
			fr := &env.Frame
			// record current delivery ID
			var deliveryID uint32
			if fr.DeliveryID == &needsDeliveryID {
				deliveryID = nextDeliveryID
				fr.DeliveryID = &deliveryID
				nextDeliveryID++
				deliveryIDFromOutputHandle[fr.Handle] = deliveryID

				if !fr.Settled {
					inputHandleFromDeliveryID[deliveryID] = env.InputHandle
				}
			} else {
				// if fr.DeliveryID is nil it must have been added
				// to deliveryIDByHandle already (multi-frame transfer)
				deliveryID = deliveryIDFromOutputHandle[fr.Handle]
			}

			// log after the delivery ID has been assigned
			debug.Log(2, "TX (Session %p): %d, %s", s, s.channel, fr)

			// frame has been sender-settled, remove from map.
			// this should only come into play for multi-frame transfers.
			if fr.Settled {
				delete(inputHandleFromDeliveryID, deliveryID)
			}

			s.txFrame(env.FrameCtx, fr)

			select {
			case <-env.FrameCtx.Done:
				if env.FrameCtx.Err != nil {
					// transfer wasn't sent, don't update state
					continue
				}
				// transfer was written to the network
			case <-s.conn.done:
				// the write failed, Conn is going down
				continue
			}

			// if not settled, add done chan to map
			if !fr.Settled && fr.Done != nil {
				settlementFromDeliveryID[deliveryID] = fr.Done
			} else if fr.Done != nil {
				// sender-settled, close done now that the transfer has been sent
				close(fr.Done)
			}

			// "Upon sending a transfer, the sending endpoint will increment
			// its next-outgoing-id, decrement its remote-incoming-window,
			// and MAY (depending on policy) decrement its outgoing-window."
			nextOutgoingID++
			// don't decrement if we're at 0 or we could loop to int max
			if remoteIncomingWindow != 0 {
				remoteIncomingWindow--
			}

		case env := <-tx:
			fr := env.FrameBody
			debug.Log(2, "TX (Session %p): %d, %s", s, s.channel, fr)
			switch fr := env.FrameBody.(type) {
			case *frames.PerformDisposition:
				if fr.Settled && fr.Role == encoding.RoleSender {
					// sender with a peer that's in mode second; sending confirmation of disposition.
					// disposition frames can reference a range of delivery IDs, although it's highly
					// likely in this case there will only be one.
					start := fr.First
					end := start
					if fr.Last != nil {
						end = *fr.Last
					}
					for deliveryID := start; deliveryID <= end; deliveryID++ {
						// send delivery state to the channel and close it to signal
						// that the delivery has completed (RSM == ModeSecond)
						if done, ok := settlementFromDeliveryID[deliveryID]; ok {
							delete(settlementFromDeliveryID, deliveryID)
							select {
							case done <- fr.State:
							default:
							}
							close(done)
						}
					}
				}
				s.txFrame(env.FrameCtx, fr)
			case *frames.PerformFlow:
				niID := nextIncomingID
				fr.NextIncomingID = &niID
				fr.IncomingWindow = s.incomingWindow
				fr.NextOutgoingID = nextOutgoingID
				fr.OutgoingWindow = s.outgoingWindow
				s.txFrame(env.FrameCtx, fr)
			case *frames.PerformTransfer:
				panic("transfer frames must use txTransfer")
			default:
				s.txFrame(env.FrameCtx, fr)
			}
		}
	}
}