in sender.go [533:607]
func (s *Sender) muxHandleFrame(fr frames.FrameBody) error {
debug.Log(2, "RX (Sender %p): %s", s, fr)
switch fr := fr.(type) {
// flow control frame
case *frames.PerformFlow:
// the sender's link-credit variable MUST be set according to this formula when flow information is given by the receiver:
// link-credit(snd) := delivery-count(rcv) + link-credit(rcv) - delivery-count(snd)
linkCredit := *fr.LinkCredit - s.l.deliveryCount
if fr.DeliveryCount != nil {
// DeliveryCount can be nil if the receiver hasn't processed
// the attach. That shouldn't be the case here, but it's
// what ActiveMQ does.
linkCredit += *fr.DeliveryCount
}
s.l.linkCredit = linkCredit
if !fr.Echo {
return nil
}
var (
// copy because sent by pointer below; prevent race
deliveryCount = s.l.deliveryCount
)
// send flow
resp := &frames.PerformFlow{
Handle: &s.l.outputHandle,
DeliveryCount: &deliveryCount,
LinkCredit: &linkCredit, // max number of messages
}
select {
case s.l.session.tx <- frameBodyEnvelope{FrameCtx: &frameContext{Ctx: context.Background()}, FrameBody: resp}:
debug.Log(2, "TX (Sender %p): mux frame to Session (%p): %d, %s", s, s.l.session, s.l.session.channel, resp)
case <-s.l.close:
return nil
case <-s.l.session.done:
return s.l.session.doneErr
}
case *frames.PerformDisposition:
if fr.Settled {
return nil
}
// peer is in mode second, so we must send confirmation of disposition.
// NOTE: the ack must be sent through the session so it can close out
// the in-flight disposition.
dr := &frames.PerformDisposition{
Role: encoding.RoleSender,
First: fr.First,
Last: fr.Last,
Settled: true,
State: fr.State,
}
select {
case s.l.session.tx <- frameBodyEnvelope{FrameCtx: &frameContext{Ctx: context.Background()}, FrameBody: dr}:
debug.Log(2, "TX (Sender %p): mux frame to Session (%p): %d, %s", s, s.l.session, s.l.session.channel, dr)
case <-s.l.close:
return nil
case <-s.l.session.done:
return s.l.session.doneErr
}
return nil
default:
return s.l.muxHandleFrame(fr)
}
return nil
}