func()

in sender.go [533:607]


func (s *Sender) muxHandleFrame(fr frames.FrameBody) error {
	debug.Log(2, "RX (Sender %p): %s", s, fr)
	switch fr := fr.(type) {
	// flow control frame
	case *frames.PerformFlow:
		// the sender's link-credit variable MUST be set according to this formula when flow information is given by the receiver:
		// link-credit(snd) := delivery-count(rcv) + link-credit(rcv) - delivery-count(snd)
		linkCredit := *fr.LinkCredit - s.l.deliveryCount
		if fr.DeliveryCount != nil {
			// DeliveryCount can be nil if the receiver hasn't processed
			// the attach. That shouldn't be the case here, but it's
			// what ActiveMQ does.
			linkCredit += *fr.DeliveryCount
		}

		s.l.linkCredit = linkCredit

		if !fr.Echo {
			return nil
		}

		var (
			// copy because sent by pointer below; prevent race
			deliveryCount = s.l.deliveryCount
		)

		// send flow
		resp := &frames.PerformFlow{
			Handle:        &s.l.outputHandle,
			DeliveryCount: &deliveryCount,
			LinkCredit:    &linkCredit, // max number of messages
		}

		select {
		case s.l.session.tx <- frameBodyEnvelope{FrameCtx: &frameContext{Ctx: context.Background()}, FrameBody: resp}:
			debug.Log(2, "TX (Sender %p): mux frame to Session (%p): %d, %s", s, s.l.session, s.l.session.channel, resp)
		case <-s.l.close:
			return nil
		case <-s.l.session.done:
			return s.l.session.doneErr
		}

	case *frames.PerformDisposition:
		if fr.Settled {
			return nil
		}

		// peer is in mode second, so we must send confirmation of disposition.
		// NOTE: the ack must be sent through the session so it can close out
		// the in-flight disposition.
		dr := &frames.PerformDisposition{
			Role:    encoding.RoleSender,
			First:   fr.First,
			Last:    fr.Last,
			Settled: true,
			State:   fr.State,
		}

		select {
		case s.l.session.tx <- frameBodyEnvelope{FrameCtx: &frameContext{Ctx: context.Background()}, FrameBody: dr}:
			debug.Log(2, "TX (Sender %p): mux frame to Session (%p): %d, %s", s, s.l.session, s.l.session.channel, dr)
		case <-s.l.close:
			return nil
		case <-s.l.session.done:
			return s.l.session.doneErr
		}

		return nil

	default:
		return s.l.muxHandleFrame(fr)
	}

	return nil
}