in receiver.go [704:768]
func (r *Receiver) muxHandleFrame(fr frames.FrameBody) error {
debug.Log(2, "RX (Receiver %p): %s", r, fr)
switch fr := fr.(type) {
// message frame
case *frames.PerformTransfer:
r.muxReceive(*fr)
// flow control frame
case *frames.PerformFlow:
if !fr.Echo {
// if the 'drain' flag has been set in the frame sent to the _receiver_ then
// we signal whomever is waiting (the service has seen and acknowledged our drain)
if fr.Drain && !r.autoSendFlow {
r.l.linkCredit = 0 // we have no active credits at this point.
r.creditor.EndDrain()
}
return nil
}
var (
// copy because sent by pointer below; prevent race
linkCredit = r.l.linkCredit
deliveryCount = r.l.deliveryCount
)
// send flow
resp := &frames.PerformFlow{
Handle: &r.l.outputHandle,
DeliveryCount: &deliveryCount,
LinkCredit: &linkCredit, // max number of messages
}
select {
case r.l.session.tx <- frameBodyEnvelope{FrameCtx: &frameContext{Ctx: context.Background()}, FrameBody: resp}:
debug.Log(2, "TX (Receiver %p): mux frame to Session (%p): %d, %s", r, r.l.session, r.l.session.channel, resp)
case <-r.l.close:
return nil
case <-r.l.session.done:
return r.l.session.doneErr
}
case *frames.PerformDisposition:
// Unblock receivers waiting for message disposition
// bubble disposition error up to the receiver
var dispositionError error
if state, ok := fr.State.(*encoding.StateRejected); ok {
// state.Error isn't required to be filled out. For instance if you dead letter a message
// you will get a rejected response that doesn't contain an error.
if state.Error != nil {
dispositionError = state.Error
}
}
// removal from the in-flight map will also remove the message from the unsettled map
count := r.inFlight.remove(fr.First, fr.Last, dispositionError, func(msg *Message) {
r.deleteUnsettled()
msg.onSettlement()
})
r.onSettlement(count)
default:
return r.l.muxHandleFrame(fr)
}
return nil
}