protected override void OnProcessTransfer()

in src/ReceivingAmqpLink.cs [492:561]


        protected override void OnProcessTransfer(Delivery delivery, Transfer transfer, Frame frame)
        {
            Fx.Assert(delivery == null || delivery == this.currentMessage, "The delivery must be null or must be the same as the current message.");

            // Whether resumed or not, an aborted delivery is considered implicitly settled. Cleanup any pending state and return.
            if (delivery.Aborted)
            {
                this.currentMessage?.Dispose();
                this.currentMessage = null;
                this.RemoveUnsettledDeliveryFromTerminusStoreIfNeeded(delivery.DeliveryTag);
                return;
            }

            if (this.Settings.MaxMessageSize.HasValue && this.Settings.MaxMessageSize.Value > 0)
            {
                ulong size = (ulong)(this.currentMessage.BytesTransfered + frame.Payload.Count);
                if (size > this.Settings.MaxMessageSize.Value)
                {
                    if (this.IsClosing())
                    {
                        // The closing sequence has been started, so any
                        // transfer is meaningless, so we can treat them as no-op
                        return;
                    }

                    throw new AmqpException(AmqpErrorCode.MessageSizeExceeded,
                        AmqpResources.GetString(AmqpResources.AmqpMessageSizeExceeded, this.currentMessage.DeliveryId.Value, size, this.Settings.MaxMessageSize.Value));
                }
            }

            Fx.Assert(this.currentMessage != null, "Current message must have been created!");
            ArraySegment<byte> payload = frame.Payload;
            frame.RawByteBuffer.AdjustPosition(payload.Offset, payload.Count);
            frame.RawByteBuffer.AddReference();    // Message also owns the buffer from now on
            this.currentMessage.AddPayload(frame.RawByteBuffer, !transfer.More());

            if (!transfer.More())
            {
                AmqpMessage message = this.currentMessage;
                this.currentMessage = null;

                AmqpTrace.Provider.AmqpReceiveMessage(this, message.DeliveryId.Value, message.Segments);

                if (delivery.Resume && this.IsRecoverable)
                {
                    if (delivery.State.IsTerminal() ||
                        (delivery.State.Transactional() && ((TransactionalState)delivery.State).Outcome.IsTerminal()))
                    {
                        // If the sender sends the delivery state is settled and the delivery state is Terminal,
                        // it means that the both peers concur on the terminal state of the delivery. Hence it
                        // should be good to cleanup any pending state for the delivery and consider it settled.
                        if (delivery.Settled)
                        {
                            this.RemoveUnsettledDeliveryFromTerminusStoreIfNeeded(delivery.DeliveryTag);
                            return;
                        }

                        // If the sender sends the delivery state as not settled but the sender indicates the
                        // delivery has reached its terminal state, it can only happen because the peers do not
                        // agree on the terminal state. But since the sender's view on this is final,update
                        // any state on our side and send a pendingDisposition with the senders Terminal State
                        // and settle the delivery.
                        this.DisposeDelivery(delivery, settled: true, delivery.State, noFlush: false);
                        return;
                    }
                }

                this.OnReceiveMessage(message);
            }
        }