in src/ReceiverLink.cs [533:582]
void UpdateDelivery(MessageDelivery messageDelivery, Outcome outcome, DeliveryState deliveryState)
{
Delivery delivery = messageDelivery.Delivery;
if (delivery == null)
{
throw new InvalidOperationException("Message was not delivered yet.");
}
if (delivery == null || delivery.Link != this)
{
throw new InvalidOperationException("Message was not received by this link.");
}
if (!delivery.Settled)
{
DeliveryState state = outcome != null ? outcome : deliveryState;
bool settled = true;
#if NETFX || NETFX40 || NETSTANDARD2_0
if (outcome != null)
{
var txnState = Amqp.Transactions.ResourceManager.GetTransactionalStateAsync(this).Result;
if (txnState != null)
{
txnState.Outcome = outcome;
state = txnState;
settled = false;
}
}
#endif
this.Session.DisposeDelivery(true, delivery, state, settled);
}
lock (this.ThisLock)
{
this.restored++;
this.pending--;
if (this.flowThreshold >= 0 && this.restored >= this.flowThreshold)
{
// total credit may be reduced. restore to what is allowed
int delta = Math.Min(this.restored, this.totalCredit - this.credit - this.pending);
if (delta > 0)
{
this.credit += delta;
this.SendFlow(this.deliveryCount, (uint)this.credit, false);
}
this.restored = 0;
}
}
}