in src/AmqpLink.cs [893:979]
internal bool SendDelivery(Delivery delivery)
{
bool settled = delivery.Settled;
bool more = true;
while (more)
{
bool firstTransfer = delivery.BytesTransfered == 0;
Transfer transfer = new Transfer();
transfer.Handle = this.LocalHandle;
transfer.More = more;
transfer.Aborted = delivery.Aborted;
transfer.Resume = delivery.Resume;
if (firstTransfer)
{
transfer.DeliveryId = uint.MaxValue; // reserve the space first
transfer.DeliveryTag = delivery.DeliveryTag;
transfer.MessageFormat = delivery.MessageFormat ?? AmqpConstants.AmqpMessageFormat;
transfer.Batchable = delivery.Batchable;
transfer.State = delivery.State;
if (settled)
{
transfer.Settled = true;
}
if (delivery.TxnId.Array != null)
{
transfer.State = new TransactionalState() { TxnId = delivery.TxnId };
}
}
ByteBuffer payload = null;
if (delivery.Resume && delivery.State.IsTerminal())
{
more = false;
transfer.More = false;
}
else
{
uint maxFrameSize = this.MaxFrameSize == uint.MaxValue ? AmqpConstants.DefaultMaxFrameSize : this.MaxFrameSize;
int overhead = Frame.HeaderSize + transfer.EncodeSize;
if (overhead > maxFrameSize)
{
throw new AmqpException(AmqpErrorCode.FrameSizeTooSmall, null);
}
int payloadSize = (int)maxFrameSize - overhead;
payload = delivery.GetPayload(payloadSize, out more);
transfer.More = more;
if (payload == null)
{
if (firstTransfer)
{
throw new AmqpException(AmqpErrorCode.NotAllowed, AmqpResources.AmqpEmptyMessageNotAllowed);
}
Fx.Assert(!more, "More flag is set but a null payload is returned.");
break;
}
}
if (!this.Session.TrySendTransfer(firstTransfer ? delivery : null, transfer, payload))
{
payload?.Dispose();
more = true;
break;
}
if (payload != null)
{
delivery.CompletePayload(payload.Length);
}
}
if (!more)
{
AmqpTrace.Provider.AmqpSentMessage(this, delivery.DeliveryId.Value, delivery.BytesTransfered);
if (delivery.Settled)
{
delivery.State = AmqpConstants.AcceptedOutcome;
this.OnDisposeDeliveryInternal(delivery);
}
}
return !more;
}