in client/src/main/java/org/apache/qpid/client/AMQSession.java [1105:1246]
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal)
throws JMSException
{
checkNotClosed();
Topic origTopic = checkValidTopic(topic, true);
AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
if (dest.getDestSyntax() == DestSyntax.ADDR && !isResolved(dest))
{
try
{
resolveAddress(dest,false,noLocal);
if (dest.getAddressType() != AMQDestination.TOPIC_TYPE)
{
throw new JMSException("Durable subscribers can only be created for Topics");
}
}
catch(QpidException e)
{
throw toJMSException("Error when verifying destination",e);
}
catch(TransportException e)
{
throw toJMSException("Error when verifying destination", e);
}
}
String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector;
_subscriberDetails.lock();
try
{
TopicSubscriberAdaptor<C> subscriber = _subscriptions.get(name);
// Not subscribed to this name in the current session
if (subscriber == null)
{
// After the address is resolved routing key will not be null.
String topicName = dest.getRoutingKey();
if (_strictAMQP)
{
if (_strictAMQPFATAL)
{
throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
}
else
{
_logger.warn("Unable to determine if subscription already exists for '" + topicName
+ "' for creation durableSubscriber. Requesting queue deletion regardless.");
}
deleteQueue(dest.getAMQQueueName());
}
else
{
Map<String,Object> args = new HashMap<String,Object>();
// We must always send the selector argument even if empty, so that we can tell when a selector is removed from a
// durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise
// possible to determine when querying the broker whether there are no arguments or just a non-matching selector
// argument, as specifying null for the arguments when querying means they should not be checked at all
args.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
if(noLocal)
{
args.put(AMQPFilterTypes.NO_LOCAL.getValue(), true);
}
// if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec
// says we must trash the subscription.
boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName());
boolean isQueueBoundForTopicAndSelector =
isQueueBound(dest.getExchangeName(),
dest.getAMQQueueName(),
topicName, args);
if (isQueueBound && !isQueueBoundForTopicAndSelector)
{
deleteQueue(dest.getAMQQueueName());
}
else if(isQueueBound) // todo - this is a hack for 0-8/9/9-1 which cannot check if arguments on a binding match
{
try
{
bindQueue(dest.getAMQQueueName(),
dest.getRoutingKey(),
args,
dest.getExchangeName(),
dest,
true);
}
catch(QpidException e)
{
throw toJMSException("Error when checking binding",e);
}
}
}
}
else
{
// Subscribed with the same topic and no current / previous or same selector
if (subscriber.getTopic().equals(topic)
&& ((messageSelector == null && subscriber.getMessageSelector() == null)
|| (messageSelector != null && messageSelector.equals(subscriber.getMessageSelector()))))
{
throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription name " + name
+ (messageSelector != null ? " and selector " + messageSelector : ""));
}
else
{
unsubscribe(name, true);
}
}
_subscriberAccess.lock();
try
{
C consumer = (C) createConsumer(dest, messageSelector, noLocal);
consumer.markAsDurableSubscriber();
subscriber = new TopicSubscriberAdaptor<C>(dest, consumer);
// Save subscription information
_subscriptions.put(name, subscriber);
_reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
}
finally
{
_subscriberAccess.unlock();
}
return subscriber;
}
catch (TransportException e)
{
throw toJMSException("Exception while creating durable subscriber:" + e.getMessage(), e);
}
finally
{
_subscriberDetails.unlock();
}
}