in receiver.go [312:365]
func (r *Receiver) messageDisposition(ctx context.Context, msg *Message, state encoding.DeliveryState) error {
// settling a message that's already settled (sender-settled or otherwise) will have a nil rcv.
// which means that r will be nil. you MUST NOT dereference r if msg.settled == true
if msg.settled {
return nil
}
debug.Assert(r != nil)
// NOTE: we MUST add to the in-flight map before sending the disposition. if not, it's possible
// to receive the ack'ing disposition frame *before* the in-flight map has been updated which
// will cause the below <-wait to never trigger.
var wait chan error
if r.l.receiverSettleMode != nil && *r.l.receiverSettleMode == ReceiverSettleModeSecond {
debug.Log(3, "TX (Receiver %p): delivery ID %d is in flight", r, msg.deliveryID)
wait = r.inFlight.add(msg)
}
if err := r.sendDisposition(ctx, msg.deliveryID, nil, state); err != nil {
return err
}
if wait == nil {
// mode first, there will be no settlement ack
msg.onSettlement()
r.deleteUnsettled()
r.onSettlement(1)
return nil
}
select {
case err := <-wait:
// err has three possibilities
// - nil, meaning the peer acknowledged the settlement
// - an *Error, meaning the peer rejected the message with a provided error
// - a non-AMQP error. this comes from calls to inFlight.clear() during mux unwind.
// only for the first two cases is the message considered settled
if amqpErr := (&Error{}); err == nil || errors.As(err, &amqpErr) {
debug.Log(3, "RX (Receiver %p): delivery ID %d has been settled", r, msg.deliveryID)
// we've received confirmation of disposition
return err
}
debug.Log(3, "RX (Receiver %p): error settling delivery ID %d: %v", r, msg.deliveryID, err)
return err
case <-ctx.Done():
// didn't receive the ack in the time allotted, leave message as unsettled
// TODO: if the ack arrives later, we need to remove the message from the unsettled map and reclaim the credit
return ctx.Err()
}
}