in src/AmqpLink.cs [585:672]
public uint ProcessFlow(Flow flow)
{
uint moreCredit = 0;
if (flow.Properties != null)
{
EventHandler propertyHandler = this.PropertyReceived;
if (propertyHandler != null)
{
propertyHandler(flow.Properties, EventArgs.Empty);
return moreCredit;
}
}
uint flowLinkCredit = flow.LinkCredit();
bool flowDrain = flow.Drain ?? false;
lock (this.syncRoot)
{
if (this.IsReceiver)
{
this.available = flow.Available ?? uint.MaxValue;
this.ApplyTempTotalLinkCredit();
if (this.drain && flowLinkCredit == 0)
{
this.linkCredit = 0;
this.deliveryCount = flow.DeliveryCount.Value;
this.drain = false;
}
}
else
{
this.drain = flowDrain;
if (flowLinkCredit != uint.MaxValue)
{
if (this.linkCredit == uint.MaxValue)
{
this.linkCredit = flowLinkCredit;
moreCredit = 0;
}
else
{
SequenceNumber otherDeliveryLimit = (flow.DeliveryCount ?? 0u) + flowLinkCredit;
SequenceNumber thisDeliveryLimit = this.deliveryCount.Value + this.linkCredit;
int delta = otherDeliveryLimit - thisDeliveryLimit;
if (delta > 0)
{
this.linkCredit += (uint)delta;
moreCredit = (uint)delta;
}
else if (delta < 0)
{
uint reduced = (uint)(-delta);
this.linkCredit = reduced > this.linkCredit ? 0 : this.linkCredit - reduced;
}
}
}
else
{
this.linkCredit = uint.MaxValue;
moreCredit = uint.MaxValue;
}
}
}
bool sendFlow = flow.Echo();
if (moreCredit > 0 || flowDrain)
{
ArraySegment<byte> txnId = this.GetTxnIdFromFlow(flow);
this.OnCreditAvailable(0, moreCredit, flowDrain, txnId);
if (this.linkCredit > 0 || flowDrain)
{
this.creditListener?.Invoke(moreCredit, this.drain, txnId);
}
if (flowDrain && !this.IsReceiver)
{
this.deliveryCount += (int)this.linkCredit;
this.linkCredit = 0;
sendFlow = true;
}
}
if (sendFlow)
{
this.SendFlow(false, this.drain, properties: null);
}
return moreCredit;
}