in src/Listener/ListenerLink.cs [291:345]
internal uint SendMessageInternal(Message message, ByteBuffer buffer, object userToken)
{
if (this.Role)
{
throw new AmqpException(ErrorCode.NotAllowed, "Cannot send a message over a receiving link.");
}
this.ThrowIfDetaching("Send");
uint tag;
uint remainingCredit;
lock (this.ThisLock)
{
tag = this.deliveryCount++;
remainingCredit = --this.credit;
}
try
{
Delivery delivery = new Delivery()
{
Handle = this.Handle,
Message = message,
Buffer = buffer ?? message.Encode(),
Link = this,
Settled = this.SettleOnSend,
OnOutcome = (a, b, c, d) => b.Delivery.OnStateChange(c),
UserToken = userToken
};
IHandler handler = this.Session.Connection.Handler;
if (handler != null && handler.CanHandle(EventId.SendDelivery))
{
handler.Handle(Event.Create(EventId.SendDelivery, this.Session.Connection, this.Session, this, context: delivery));
}
if (delivery.Tag == null)
{
delivery.Tag = Delivery.GetDeliveryTag(tag);
}
this.Session.SendDelivery(delivery);
return remainingCredit;
}
catch
{
lock (this.ThisLock)
{
this.credit++;
this.deliveryCount--;
}
throw;
}
}