in sender.go [435:529]
func (s *Sender) mux(hooks senderTestHooks) {
if hooks.MuxSelect == nil {
hooks.MuxSelect = nopHook
}
if hooks.MuxTransfer == nil {
hooks.MuxTransfer = nopHook
}
defer func() {
close(s.l.done)
}()
Loop:
for {
var outgoingTransfers chan transferEnvelope
if s.l.linkCredit > 0 {
debug.Log(1, "TX (Sender %p) (enable): target: %q, link credit: %d, deliveryCount: %d", s, s.l.target.Address, s.l.linkCredit, s.l.deliveryCount)
outgoingTransfers = s.transfers
} else {
debug.Log(1, "TX (Sender %p) (pause): target: %q, link credit: %d, deliveryCount: %d", s, s.l.target.Address, s.l.linkCredit, s.l.deliveryCount)
}
closed := s.l.close
if s.l.closeInProgress {
// swap out channel so it no longer triggers
closed = nil
// disable sending once closing is in progress.
// this prevents races with mux shutdown and
// the peer sending disposition frames.
outgoingTransfers = nil
}
hooks.MuxSelect()
select {
// received frame
case q := <-s.l.rxQ.Wait():
// populated queue
fr := *q.Dequeue()
s.l.rxQ.Release(q)
// if muxHandleFrame returns an error it means the mux must terminate.
// note that in the case of a client-side close due to an error, nil
// is returned in order to keep the mux running to ack the detach frame.
if err := s.muxHandleFrame(fr); err != nil {
s.l.doneErr = err
return
}
// send data
case env := <-outgoingTransfers:
hooks.MuxTransfer()
select {
case s.l.session.txTransfer <- env:
debug.Log(2, "TX (Sender %p): mux transfer to Session: %d, %s", s, s.l.session.channel, env.Frame)
// decrement link-credit after entire message transferred
if !env.Frame.More {
s.l.deliveryCount++
s.l.linkCredit--
// we are the sender and we keep track of the peer's link credit
debug.Log(3, "TX (Sender %p): link: %s, link credit: %d", s, s.l.key.name, s.l.linkCredit)
}
continue Loop
case <-s.l.close:
continue Loop
case <-s.l.session.done:
continue Loop
}
case <-closed:
if s.l.closeInProgress {
// a client-side close due to protocol error is in progress
continue
}
// sender is being closed by the client
s.l.closeInProgress = true
fr := &frames.PerformDetach{
Handle: s.l.outputHandle,
Closed: true,
}
s.l.txFrame(&frameContext{Ctx: context.Background()}, fr)
case <-s.l.session.done:
s.l.doneErr = s.l.session.doneErr
return
case <-s.rollback:
s.l.deliveryCount--
s.l.linkCredit++
debug.Log(3, "TX (Sender %p): rollback link: %s, link credit: %d", s, s.l.key.name, s.l.linkCredit)
}
}
}