in src/Session.cs [750:820]
void WriteDelivery(Delivery delivery)
{
// Must be called single threaded. Delivery must be on list already
// Responsible for releasing the buffer when done transferring or with a closed link
bool more = true;
while (more)
{
Transfer transfer = new Transfer() { Handle = delivery.Handle };
bool first = delivery.BytesTransfered == 0;
if (first)
{
// initialize properties for first transfer
delivery.DeliveryId = this.outgoingDeliveryId++;
transfer.DeliveryTag = delivery.Tag;
transfer.DeliveryId = delivery.DeliveryId;
transfer.State = delivery.State;
transfer.MessageFormat = delivery.Message.Format;
transfer.Settled = delivery.Settled;
transfer.Batchable = delivery.Batchable;
}
int len = this.connection.SendCommand(this.channel, transfer, first,
delivery.Buffer, delivery.ReservedBufferSize);
delivery.BytesTransfered += len;
delivery.Buffer.Complete(len);
Delivery release = null;
lock (this.ThisLock)
{
this.nextOutgoingId++;
if (this.outgoingWindow > 0)
{
this.outgoingWindow--;
}
if (delivery.Buffer.Length == 0 || delivery.Link.IsClosed)
{
delivery.InProgress = false;
var next = (Delivery)delivery.Next;
if (delivery.Link.IsClosed)
{
release = delivery;
this.outgoingList.Remove(delivery);
}
else if (delivery.Settled)
{
this.outgoingList.Remove(delivery);
}
delivery = next;
}
if (delivery == null)
{
this.writingDelivery = false;
more = false;
}
else if (this.outgoingWindow == 0)
{
delivery.InProgress = false;
this.writingDelivery = false;
more = false;
}
}
if (release != null)
{
Delivery.ReleaseAll(release, release.Link.Error);
}
}
}