func()

in receiver.go [704:768]


func (r *Receiver) muxHandleFrame(fr frames.FrameBody) error {
	debug.Log(2, "RX (Receiver %p): %s", r, fr)
	switch fr := fr.(type) {
	// message frame
	case *frames.PerformTransfer:
		r.muxReceive(*fr)

	// flow control frame
	case *frames.PerformFlow:
		if !fr.Echo {
			// if the 'drain' flag has been set in the frame sent to the _receiver_ then
			// we signal whomever is waiting (the service has seen and acknowledged our drain)
			if fr.Drain && !r.autoSendFlow {
				r.l.linkCredit = 0 // we have no active credits at this point.
				r.creditor.EndDrain()
			}
			return nil
		}

		var (
			// copy because sent by pointer below; prevent race
			linkCredit    = r.l.linkCredit
			deliveryCount = r.l.deliveryCount
		)

		// send flow
		resp := &frames.PerformFlow{
			Handle:        &r.l.outputHandle,
			DeliveryCount: &deliveryCount,
			LinkCredit:    &linkCredit, // max number of messages
		}

		select {
		case r.l.session.tx <- frameBodyEnvelope{FrameCtx: &frameContext{Ctx: context.Background()}, FrameBody: resp}:
			debug.Log(2, "TX (Receiver %p): mux frame to Session (%p): %d, %s", r, r.l.session, r.l.session.channel, resp)
		case <-r.l.close:
			return nil
		case <-r.l.session.done:
			return r.l.session.doneErr
		}

	case *frames.PerformDisposition:
		// Unblock receivers waiting for message disposition
		// bubble disposition error up to the receiver
		var dispositionError error
		if state, ok := fr.State.(*encoding.StateRejected); ok {
			// state.Error isn't required to be filled out. For instance if you dead letter a message
			// you will get a rejected response that doesn't contain an error.
			if state.Error != nil {
				dispositionError = state.Error
			}
		}
		// removal from the in-flight map will also remove the message from the unsettled map
		count := r.inFlight.remove(fr.First, fr.Last, dispositionError, func(msg *Message) {
			r.deleteUnsettled()
			msg.onSettlement()
		})
		r.onSettlement(count)

	default:
		return r.l.muxHandleFrame(fr)
	}

	return nil
}