func()

in receiver.go [312:365]


func (r *Receiver) messageDisposition(ctx context.Context, msg *Message, state encoding.DeliveryState) error {
	// settling a message that's already settled (sender-settled or otherwise) will have a nil rcv.
	// which means that r will be nil. you MUST NOT dereference r if msg.settled == true
	if msg.settled {
		return nil
	}

	debug.Assert(r != nil)

	// NOTE: we MUST add to the in-flight map before sending the disposition. if not, it's possible
	// to receive the ack'ing disposition frame *before* the in-flight map has been updated which
	// will cause the below <-wait to never trigger.

	var wait chan error
	if r.l.receiverSettleMode != nil && *r.l.receiverSettleMode == ReceiverSettleModeSecond {
		debug.Log(3, "TX (Receiver %p): delivery ID %d is in flight", r, msg.deliveryID)
		wait = r.inFlight.add(msg)
	}

	if err := r.sendDisposition(ctx, msg.deliveryID, nil, state); err != nil {
		return err
	}

	if wait == nil {
		// mode first, there will be no settlement ack
		msg.onSettlement()
		r.deleteUnsettled()
		r.onSettlement(1)
		return nil
	}

	select {
	case err := <-wait:
		// err has three possibilities
		//   - nil, meaning the peer acknowledged the settlement
		//   - an *Error, meaning the peer rejected the message with a provided error
		//   - a non-AMQP error. this comes from calls to inFlight.clear() during mux unwind.
		// only for the first two cases is the message considered settled

		if amqpErr := (&Error{}); err == nil || errors.As(err, &amqpErr) {
			debug.Log(3, "RX (Receiver %p): delivery ID %d has been settled", r, msg.deliveryID)
			// we've received confirmation of disposition
			return err
		}

		debug.Log(3, "RX (Receiver %p): error settling delivery ID %d: %v", r, msg.deliveryID, err)
		return err

	case <-ctx.Done():
		// didn't receive the ack in the time allotted, leave message as unsettled
		// TODO: if the ack arrives later, we need to remove the message from the unsettled map and reclaim the credit
		return ctx.Err()
	}
}