in src/MessageConsumer.cs [91:180]
internal MessageConsumer(Session session, ConsumerId id, ActiveMQDestination destination,
String name, String selector, int prefetch, int maxPendingMessageCount,
bool noLocal, bool browser, bool dispatchAsync )
{
if(destination == null)
{
throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");
}
else if(destination.PhysicalName == null)
{
throw new InvalidDestinationException("The destination object was not given a physical name.");
}
else if (destination.IsTemporary)
{
String physicalName = destination.PhysicalName;
if(String.IsNullOrEmpty(physicalName))
{
throw new InvalidDestinationException("Physical name of Destination should be valid: " + destination);
}
String connectionID = session.Connection.ConnectionId.Value;
if(physicalName.IndexOf(connectionID) < 0)
{
throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
}
if(!session.Connection.IsTempDestinationActive(destination as ActiveMQTempDestination))
{
throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
}
}
this.session = session;
this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
this.messageTransformation = this.session.Connection.MessageTransformation;
if(session.Connection.MessagePrioritySupported)
{
this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
}
else
{
this.unconsumedMessages = new FifoMessageDispatchChannel();
}
this.info = new ConsumerInfo();
this.info.ConsumerId = id;
this.info.Destination = destination;
this.info.SubscriptionName = name;
this.info.Selector = selector;
this.info.PrefetchSize = prefetch;
this.info.MaximumPendingMessageLimit = maxPendingMessageCount;
this.info.NoLocal = noLocal;
this.info.Browser = browser;
this.info.DispatchAsync = dispatchAsync;
this.info.Retroactive = session.Retroactive;
this.info.Exclusive = session.Exclusive;
this.info.Priority = session.Priority;
this.info.ClientId = session.Connection.ClientId;
// If the destination contained a URI query, then use it to set public properties
// on the ConsumerInfo
if(destination.Options != null)
{
// Get options prefixed with "consumer.*"
StringDictionary options = URISupport.GetProperties(destination.Options, "consumer.");
// Extract out custom extension options "consumer.nms.*"
StringDictionary customConsumerOptions = URISupport.ExtractProperties(options, "nms.");
URISupport.SetProperties(this.info, options);
URISupport.SetProperties(this, customConsumerOptions, "nms.");
}
this.optimizeAcknowledge = session.Connection.OptimizeAcknowledge &&
session.IsAutoAcknowledge && !this.info.Browser;
if (this.optimizeAcknowledge) {
this.optimizeAcknowledgeTimeOut = session.Connection.OptimizeAcknowledgeTimeOut;
OptimizedAckScheduledAckInterval = session.Connection.OptimizedAckScheduledAckInterval;
}
this.info.OptimizedAcknowledge = this.optimizeAcknowledge;
this.failoverRedeliveryWaitPeriod = session.Connection.ConsumerFailoverRedeliveryWaitPeriod;
this.nonBlockingRedelivery = session.Connection.NonBlockingRedelivery;
this.transactedIndividualAck = session.Connection.TransactedIndividualAck ||
session.Connection.MessagePrioritySupported ||
this.nonBlockingRedelivery;
}