in src/AmqpLink.cs [1395:1434]
void SendFlowInternal(bool echo, bool drain, Fields properties)
{
this.drain = drain;
bool forceSend = echo || properties != null;
if (!forceSend)
{
if (Interlocked.Increment(ref this.sendingFlow) != 1)
{
return;
}
}
do
{
Flow flow = new Flow();
flow.Handle = this.LocalHandle;
lock (this.syncRoot)
{
flow.LinkCredit = this.linkCredit;
flow.Available = this.Available;
flow.DeliveryCount = this.deliveryCount.Value;
if (this.drain)
{
flow.Drain = true;
}
}
flow.Echo = echo;
flow.Properties = properties;
if (this.State == AmqpObjectState.Opened ||
this.State == AmqpObjectState.OpenSent ||
this.State == AmqpObjectState.OpenReceived)
{
this.Session.SendFlow(flow);
}
}
while (!forceSend && Interlocked.Decrement(ref this.sendingFlow) > 0);
}