in src/MessageProducer.cs [216:300]
protected async Task SendAsync(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive, bool specifiedTimeToLive)
{
if(null == destination)
{
// See if this producer was created without a destination.
if(null == info.Destination)
{
throw new NotSupportedException();
}
// The producer was created with a destination, but an invalid destination
// was specified.
throw new Apache.NMS.InvalidDestinationException();
}
ActiveMQDestination dest = null;
if(destination == this.info.Destination)
{
dest = destination as ActiveMQDestination;
}
else if(info.Destination == null)
{
dest = ActiveMQDestination.Transform(destination);
}
else
{
throw new NotSupportedException("This producer can only send messages to: " + this.info.Destination.PhysicalName);
}
if(this.ProducerTransformer != null)
{
IMessage transformed = this.ProducerTransformer(this.session, this, message);
if(transformed != null)
{
message = transformed;
}
}
ActiveMQMessage activeMessage = this.messageTransformation.TransformMessage<ActiveMQMessage>(message);
activeMessage.ProducerId = info.ProducerId;
activeMessage.Destination = dest;
activeMessage.NMSDeliveryMode = deliveryMode;
activeMessage.NMSPriority = priority;
// Always set the message Id regardless of the disable flag.
MessageId id = new MessageId();
id.ProducerId = info.ProducerId;
id.ProducerSequenceId = Interlocked.Increment(ref this.producerSequenceId);
activeMessage.MessageId = id;
// Ensure that the source message contains the NMSMessageId of the transformed
// message for correlation purposes.
if (!ReferenceEquals(message, activeMessage))
{
message.NMSMessageId = activeMessage.NMSMessageId;
}
if(!disableMessageTimestamp)
{
activeMessage.NMSTimestamp = DateTime.UtcNow;
}
if(specifiedTimeToLive)
{
activeMessage.NMSTimeToLive = timeToLive;
}
// Ensure there's room left to send this message
if(this.usage != null)
{
usage.WaitForSpace();
}
using(await closedLock.LockAsync().Await())
{
if(closed)
{
throw new ConnectionClosedException();
}
await session.DoSendAsync(dest, activeMessage, this, this.usage, this.RequestTimeout).Await();
}
}