in src/Proton/Engine/Implementation/ProtonReceiver.cs [269:336]
protected override void HandleRemoteTransfer(Transfer transfer, IProtonBuffer payload, out ProtonIncomingDelivery delivery)
{
if (currentDeliveryId != null && (!transfer.HasDeliveryId() || currentDeliveryId.Equals(transfer.DeliveryId)))
{
delivery = unsettled[(uint)currentDeliveryId];
}
else
{
VerifyNewDeliveryIdSequence(transfer, currentDeliveryId);
delivery = new(this, transfer.DeliveryId, transfer.DeliveryTag)
{
MessageFormat = transfer.MessageFormat
};
unsettled.Add(transfer.DeliveryId, delivery);
currentDeliveryId = transfer.DeliveryId;
}
delivery.IncrementAndGetTransferCount();
if (transfer.HasState())
{
delivery.RemoteState = transfer.DeliveryState;
}
if (transfer.Settled || transfer.Aborted)
{
delivery.RemotelySettled();
}
if (payload != null)
{
delivery.AppendTransferPayload(payload);
}
bool done = transfer.Aborted || !transfer.More;
if (done)
{
CreditState.DecrementCredit();
CreditState.IncrementDeliveryCount();
currentDeliveryId = null;
if (transfer.Aborted)
{
delivery.Aborted();
}
else
{
delivery.Completed();
}
}
if (transfer.Aborted)
{
FireDeliveryAborted(delivery);
}
else
{
FireDeliveryRead(delivery);
}
if (IsDraining && Credit == 0)
{
drainStateSnapshot = null;
FireCreditStateUpdated();
}
}