in src/ReceivingAmqpLink.cs [492:561]
protected override void OnProcessTransfer(Delivery delivery, Transfer transfer, Frame frame)
{
Fx.Assert(delivery == null || delivery == this.currentMessage, "The delivery must be null or must be the same as the current message.");
// Whether resumed or not, an aborted delivery is considered implicitly settled. Cleanup any pending state and return.
if (delivery.Aborted)
{
this.currentMessage?.Dispose();
this.currentMessage = null;
this.RemoveUnsettledDeliveryFromTerminusStoreIfNeeded(delivery.DeliveryTag);
return;
}
if (this.Settings.MaxMessageSize.HasValue && this.Settings.MaxMessageSize.Value > 0)
{
ulong size = (ulong)(this.currentMessage.BytesTransfered + frame.Payload.Count);
if (size > this.Settings.MaxMessageSize.Value)
{
if (this.IsClosing())
{
// The closing sequence has been started, so any
// transfer is meaningless, so we can treat them as no-op
return;
}
throw new AmqpException(AmqpErrorCode.MessageSizeExceeded,
AmqpResources.GetString(AmqpResources.AmqpMessageSizeExceeded, this.currentMessage.DeliveryId.Value, size, this.Settings.MaxMessageSize.Value));
}
}
Fx.Assert(this.currentMessage != null, "Current message must have been created!");
ArraySegment<byte> payload = frame.Payload;
frame.RawByteBuffer.AdjustPosition(payload.Offset, payload.Count);
frame.RawByteBuffer.AddReference(); // Message also owns the buffer from now on
this.currentMessage.AddPayload(frame.RawByteBuffer, !transfer.More());
if (!transfer.More())
{
AmqpMessage message = this.currentMessage;
this.currentMessage = null;
AmqpTrace.Provider.AmqpReceiveMessage(this, message.DeliveryId.Value, message.Segments);
if (delivery.Resume && this.IsRecoverable)
{
if (delivery.State.IsTerminal() ||
(delivery.State.Transactional() && ((TransactionalState)delivery.State).Outcome.IsTerminal()))
{
// If the sender sends the delivery state is settled and the delivery state is Terminal,
// it means that the both peers concur on the terminal state of the delivery. Hence it
// should be good to cleanup any pending state for the delivery and consider it settled.
if (delivery.Settled)
{
this.RemoveUnsettledDeliveryFromTerminusStoreIfNeeded(delivery.DeliveryTag);
return;
}
// If the sender sends the delivery state as not settled but the sender indicates the
// delivery has reached its terminal state, it can only happen because the peers do not
// agree on the terminal state. But since the sender's view on this is final,update
// any state on our side and send a pendingDisposition with the senders Terminal State
// and settle the delivery.
this.DisposeDelivery(delivery, settled: true, delivery.State, noFlush: false);
return;
}
}
this.OnReceiveMessage(message);
}
}