in receiver.go [141:169]
func (r *Receiver) Receive(ctx context.Context, opts *ReceiveOptions) (*Message, error) {
if msg := r.Prefetched(); msg != nil {
return msg, nil
}
// wait for the next message
select {
case q := <-r.messagesQ.Wait():
msg := q.Dequeue()
debug.Assert(msg != nil)
debug.Log(3, "RX (Receiver %p): received delivery ID %d", r, msg.deliveryID)
r.messagesQ.Release(q)
if msg.settled {
r.onSettlement(1)
}
return msg, nil
case <-r.l.done:
// if the link receives messages and is then closed between the above call to r.Prefetched()
// and this select statement, the order of selecting r.messages and r.l.done is undefined.
// however, once r.l.done is closed the link cannot receive any more messages. so be sure to
// drain any that might have trickled in within this window.
if msg := r.Prefetched(); msg != nil {
return msg, nil
}
return nil, r.l.doneErr
case <-ctx.Done():
return nil, ctx.Err()
}
}