in src/AmqpSession.cs [778:845]
public void OnReceiveDisposition(Disposition disposition)
{
SequenceNumber first = disposition.First.Value;
SequenceNumber last = disposition.Last ?? first;
if (last < first)
{
// Should be a protocol error
return;
}
List<Delivery> disposedDeliveries = new List<Delivery>();
int settledCount = 0;
lock (this.syncRoot)
{
if (first >= this.nextDeliveryId)
{
return;
}
if (last > this.nextDeliveryId)
{
last = this.nextDeliveryId;
}
bool settled = disposition.Settled();
Delivery current = this.firstUnsettled;
while (current != null)
{
SequenceNumber sn = current.DeliveryId.Value;
if (sn < first)
{
current = current.Next;
}
else if (sn > last)
{
break;
}
else
{
Delivery delivery = current;
current = current.Next;
delivery.Settled = settled;
delivery.State = disposition.State;
if (settled)
{
++settledCount;
Delivery.Remove(ref this.firstUnsettled, ref this.lastUnsettled, delivery);
}
disposedDeliveries.Add(delivery);
}
}
}
if (disposedDeliveries.Count > 0)
{
foreach (Delivery delivery in disposedDeliveries)
{
delivery.Link.OnDisposeDelivery(delivery);
}
if (settledCount > 0)
{
this.OnWindowMoved(settledCount);
}
}
}