in src/ReceiverLink.cs [372:444]
internal override void OnTransfer(Delivery delivery, Transfer transfer, ByteBuffer buffer)
{
if (delivery == null)
{
delivery = this.deliveryCurrent.Delivery;
AmqpBitConverter.WriteBytes(delivery.Buffer, buffer.Buffer, buffer.Offset, buffer.Length);
}
else
{
this.deliveryCurrent = new MessageDelivery(delivery, transfer.MessageFormat);
buffer.AddReference();
delivery.Buffer = buffer;
lock (this.ThisLock)
{
this.OnDelivery(transfer.DeliveryId);
}
}
if (!transfer.More)
{
delivery.Message = Message.Decode(delivery.Buffer);
delivery.Message.Format = this.deliveryCurrent.MessageFormat;
this.deliveryCurrent = MessageDelivery.None;
IHandler handler = this.Session.Connection.Handler;
if (handler != null && handler.CanHandle(EventId.ReceiveDelivery))
{
handler.Handle(Event.Create(EventId.ReceiveDelivery, this.Session.Connection, this.Session, this, context: delivery));
}
Waiter waiter;
MessageCallback callback = this.onMessage;
lock (this.ThisLock)
{
waiter = (Waiter)this.waiterList.First;
if (waiter != null)
{
this.waiterList.Remove(waiter);
}
else if (callback == null)
{
this.receivedMessages.Add(new MessageNode() { Message = delivery.Message });
return;
}
}
while (waiter != null)
{
if (waiter.Signal(delivery.Message))
{
return;
}
lock (this.ThisLock)
{
waiter = (Waiter)this.waiterList.First;
if (waiter != null)
{
this.waiterList.Remove(waiter);
}
else if (callback == null)
{
this.receivedMessages.Add(new MessageNode() { Message = delivery.Message });
return;
}
}
}
Fx.Assert(waiter == null, "waiter must be null now");
Fx.Assert(callback != null, "callback must not be null now");
callback(this, delivery.Message);
}
}