in receiver.go [542:665]
func (r *Receiver) mux(hooks receiverTestHooks) {
if hooks.MuxSelect == nil {
hooks.MuxSelect = nopHook
}
if hooks.MuxStart == nil {
hooks.MuxStart = nopHook
}
defer func() {
// unblock any in flight message dispositions
r.inFlight.clear(r.l.doneErr)
if !r.autoSendFlow {
// unblock any pending drain requests
r.creditor.EndDrain()
}
close(r.l.done)
}()
hooks.MuxStart()
if r.autoSendFlow {
r.l.doneErr = r.muxFlow(r.l.linkCredit, false)
}
for {
msgLen := r.messagesQ.Len()
r.settlementCountMu.Lock()
// counter that accumulates the settled delivery count.
// once the threshold has been reached, the counter is
// reset and a flow frame is sent.
previousSettlementCount := r.settlementCount
if previousSettlementCount >= r.l.linkCredit {
r.settlementCount = 0
}
r.settlementCountMu.Unlock()
// once we have pending credit equal to or greater than our available credit, reclaim it.
// we do this instead of settlementCount > 0 to prevent flow frames from being too chatty.
// NOTE: we compare the settlementCount against the current link credit instead of some
// fixed threshold to ensure credit is reclaimed in cases where the number of unsettled
// messages remains high for whatever reason.
if r.autoSendFlow && previousSettlementCount > 0 && previousSettlementCount >= r.l.linkCredit {
debug.Log(1, "RX (Receiver %p) (auto): source: %q, inflight: %d, linkCredit: %d, deliveryCount: %d, messages: %d, unsettled: %d, settlementCount: %d, settleMode: %s",
r, r.l.source.Address, r.inFlight.len(), r.l.linkCredit, r.l.deliveryCount, msgLen, r.countUnsettled(), previousSettlementCount, r.l.receiverSettleMode.String())
r.l.doneErr = r.creditor.IssueCredit(previousSettlementCount)
} else if r.l.linkCredit == 0 {
debug.Log(1, "RX (Receiver %p) (pause): source: %q, inflight: %d, linkCredit: %d, deliveryCount: %d, messages: %d, unsettled: %d, settlementCount: %d, settleMode: %s",
r, r.l.source.Address, r.inFlight.len(), r.l.linkCredit, r.l.deliveryCount, msgLen, r.countUnsettled(), previousSettlementCount, r.l.receiverSettleMode.String())
}
if r.l.doneErr != nil {
return
}
drain, credits := r.creditor.FlowBits(r.l.linkCredit)
if drain || credits > 0 {
debug.Log(1, "RX (Receiver %p) (flow): source: %q, inflight: %d, curLinkCredit: %d, newLinkCredit: %d, drain: %v, deliveryCount: %d, messages: %d, unsettled: %d, settlementCount: %d, settleMode: %s",
r, r.l.source.Address, r.inFlight.len(), r.l.linkCredit, credits, drain, r.l.deliveryCount, msgLen, r.countUnsettled(), previousSettlementCount, r.l.receiverSettleMode.String())
// send a flow frame.
r.l.doneErr = r.muxFlow(credits, drain)
}
if r.l.doneErr != nil {
return
}
txDisposition := r.txDisposition
closed := r.l.close
if r.l.closeInProgress {
// swap out channel so it no longer triggers
closed = nil
// disable sending of disposition frames once closing is in progress.
// this is to prevent races between mux shutdown and clearing of
// any in-flight dispositions.
txDisposition = nil
}
hooks.MuxSelect()
select {
case q := <-r.l.rxQ.Wait():
// populated queue
fr := *q.Dequeue()
r.l.rxQ.Release(q)
// if muxHandleFrame returns an error it means the mux must terminate.
// note that in the case of a client-side close due to an error, nil
// is returned in order to keep the mux running to ack the detach frame.
if err := r.muxHandleFrame(fr); err != nil {
r.l.doneErr = err
return
}
case env := <-txDisposition:
r.l.txFrame(env.FrameCtx, env.FrameBody)
case <-r.receiverReady:
continue
case <-closed:
if r.l.closeInProgress {
// a client-side close due to protocol error is in progress
continue
}
// receiver is being closed by the client
r.l.closeInProgress = true
fr := &frames.PerformDetach{
Handle: r.l.outputHandle,
Closed: true,
}
r.l.txFrame(&frameContext{Ctx: context.Background()}, fr)
case <-r.l.session.done:
r.l.doneErr = r.l.session.doneErr
return
}
}
}