func()

in sender.go [435:529]


func (s *Sender) mux(hooks senderTestHooks) {
	if hooks.MuxSelect == nil {
		hooks.MuxSelect = nopHook
	}
	if hooks.MuxTransfer == nil {
		hooks.MuxTransfer = nopHook
	}

	defer func() {
		close(s.l.done)
	}()

Loop:
	for {
		var outgoingTransfers chan transferEnvelope
		if s.l.linkCredit > 0 {
			debug.Log(1, "TX (Sender %p) (enable): target: %q, link credit: %d, deliveryCount: %d", s, s.l.target.Address, s.l.linkCredit, s.l.deliveryCount)
			outgoingTransfers = s.transfers
		} else {
			debug.Log(1, "TX (Sender %p) (pause): target: %q, link credit: %d, deliveryCount: %d", s, s.l.target.Address, s.l.linkCredit, s.l.deliveryCount)
		}

		closed := s.l.close
		if s.l.closeInProgress {
			// swap out channel so it no longer triggers
			closed = nil

			// disable sending once closing is in progress.
			// this prevents races with mux shutdown and
			// the peer sending disposition frames.
			outgoingTransfers = nil
		}

		hooks.MuxSelect()

		select {
		// received frame
		case q := <-s.l.rxQ.Wait():
			// populated queue
			fr := *q.Dequeue()
			s.l.rxQ.Release(q)

			// if muxHandleFrame returns an error it means the mux must terminate.
			// note that in the case of a client-side close due to an error, nil
			// is returned in order to keep the mux running to ack the detach frame.
			if err := s.muxHandleFrame(fr); err != nil {
				s.l.doneErr = err
				return
			}

		// send data
		case env := <-outgoingTransfers:
			hooks.MuxTransfer()
			select {
			case s.l.session.txTransfer <- env:
				debug.Log(2, "TX (Sender %p): mux transfer to Session: %d, %s", s, s.l.session.channel, env.Frame)
				// decrement link-credit after entire message transferred
				if !env.Frame.More {
					s.l.deliveryCount++
					s.l.linkCredit--
					// we are the sender and we keep track of the peer's link credit
					debug.Log(3, "TX (Sender %p): link: %s, link credit: %d", s, s.l.key.name, s.l.linkCredit)
				}
				continue Loop
			case <-s.l.close:
				continue Loop
			case <-s.l.session.done:
				continue Loop
			}

		case <-closed:
			if s.l.closeInProgress {
				// a client-side close due to protocol error is in progress
				continue
			}

			// sender is being closed by the client
			s.l.closeInProgress = true
			fr := &frames.PerformDetach{
				Handle: s.l.outputHandle,
				Closed: true,
			}
			s.l.txFrame(&frameContext{Ctx: context.Background()}, fr)

		case <-s.l.session.done:
			s.l.doneErr = s.l.session.doneErr
			return

		case <-s.rollback:
			s.l.deliveryCount--
			s.l.linkCredit++
			debug.Log(3, "TX (Sender %p): rollback link: %s, link credit: %d", s, s.l.key.name, s.l.linkCredit)
		}
	}
}