func()

in receiver.go [542:665]


func (r *Receiver) mux(hooks receiverTestHooks) {
	if hooks.MuxSelect == nil {
		hooks.MuxSelect = nopHook
	}
	if hooks.MuxStart == nil {
		hooks.MuxStart = nopHook
	}

	defer func() {
		// unblock any in flight message dispositions
		r.inFlight.clear(r.l.doneErr)

		if !r.autoSendFlow {
			// unblock any pending drain requests
			r.creditor.EndDrain()
		}

		close(r.l.done)
	}()

	hooks.MuxStart()

	if r.autoSendFlow {
		r.l.doneErr = r.muxFlow(r.l.linkCredit, false)
	}

	for {
		msgLen := r.messagesQ.Len()

		r.settlementCountMu.Lock()
		// counter that accumulates the settled delivery count.
		// once the threshold has been reached, the counter is
		// reset and a flow frame is sent.
		previousSettlementCount := r.settlementCount
		if previousSettlementCount >= r.l.linkCredit {
			r.settlementCount = 0
		}
		r.settlementCountMu.Unlock()

		// once we have pending credit equal to or greater than our available credit, reclaim it.
		// we do this instead of settlementCount > 0 to prevent flow frames from being too chatty.
		// NOTE: we compare the settlementCount against the current link credit instead of some
		// fixed threshold to ensure credit is reclaimed in cases where the number of unsettled
		// messages remains high for whatever reason.
		if r.autoSendFlow && previousSettlementCount > 0 && previousSettlementCount >= r.l.linkCredit {
			debug.Log(1, "RX (Receiver %p) (auto): source: %q, inflight: %d, linkCredit: %d, deliveryCount: %d, messages: %d, unsettled: %d, settlementCount: %d, settleMode: %s",
				r, r.l.source.Address, r.inFlight.len(), r.l.linkCredit, r.l.deliveryCount, msgLen, r.countUnsettled(), previousSettlementCount, r.l.receiverSettleMode.String())
			r.l.doneErr = r.creditor.IssueCredit(previousSettlementCount)
		} else if r.l.linkCredit == 0 {
			debug.Log(1, "RX (Receiver %p) (pause): source: %q, inflight: %d, linkCredit: %d, deliveryCount: %d, messages: %d, unsettled: %d, settlementCount: %d, settleMode: %s",
				r, r.l.source.Address, r.inFlight.len(), r.l.linkCredit, r.l.deliveryCount, msgLen, r.countUnsettled(), previousSettlementCount, r.l.receiverSettleMode.String())
		}

		if r.l.doneErr != nil {
			return
		}

		drain, credits := r.creditor.FlowBits(r.l.linkCredit)
		if drain || credits > 0 {
			debug.Log(1, "RX (Receiver %p) (flow): source: %q, inflight: %d, curLinkCredit: %d, newLinkCredit: %d, drain: %v, deliveryCount: %d, messages: %d, unsettled: %d, settlementCount: %d, settleMode: %s",
				r, r.l.source.Address, r.inFlight.len(), r.l.linkCredit, credits, drain, r.l.deliveryCount, msgLen, r.countUnsettled(), previousSettlementCount, r.l.receiverSettleMode.String())

			// send a flow frame.
			r.l.doneErr = r.muxFlow(credits, drain)
		}

		if r.l.doneErr != nil {
			return
		}

		txDisposition := r.txDisposition
		closed := r.l.close
		if r.l.closeInProgress {
			// swap out channel so it no longer triggers
			closed = nil

			// disable sending of disposition frames once closing is in progress.
			// this is to prevent races between mux shutdown and clearing of
			// any in-flight dispositions.
			txDisposition = nil
		}

		hooks.MuxSelect()

		select {
		case q := <-r.l.rxQ.Wait():
			// populated queue
			fr := *q.Dequeue()
			r.l.rxQ.Release(q)

			// if muxHandleFrame returns an error it means the mux must terminate.
			// note that in the case of a client-side close due to an error, nil
			// is returned in order to keep the mux running to ack the detach frame.
			if err := r.muxHandleFrame(fr); err != nil {
				r.l.doneErr = err
				return
			}

		case env := <-txDisposition:
			r.l.txFrame(env.FrameCtx, env.FrameBody)

		case <-r.receiverReady:
			continue

		case <-closed:
			if r.l.closeInProgress {
				// a client-side close due to protocol error is in progress
				continue
			}

			// receiver is being closed by the client
			r.l.closeInProgress = true
			fr := &frames.PerformDetach{
				Handle: r.l.outputHandle,
				Closed: true,
			}
			r.l.txFrame(&frameContext{Ctx: context.Background()}, fr)

		case <-r.l.session.done:
			r.l.doneErr = r.l.session.doneErr
			return
		}
	}
}