public TopicSubscriber createDurableSubscriber()

in client/src/main/java/org/apache/qpid/client/AMQSession.java [1105:1246]


    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal)
            throws JMSException
    {
        checkNotClosed();
        Topic origTopic = checkValidTopic(topic, true);

        AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
        if (dest.getDestSyntax() == DestSyntax.ADDR && !isResolved(dest))
        {
            try
            {
                resolveAddress(dest,false,noLocal);
                if (dest.getAddressType() !=  AMQDestination.TOPIC_TYPE)
                {
                    throw new JMSException("Durable subscribers can only be created for Topics");
                }
            }
            catch(QpidException e)
            {
                throw toJMSException("Error when verifying destination",e);
            }
            catch(TransportException e)
            {
                throw toJMSException("Error when verifying destination", e);
            }
        }

        String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector;

        _subscriberDetails.lock();
        try
        {
            TopicSubscriberAdaptor<C> subscriber = _subscriptions.get(name);

            // Not subscribed to this name in the current session
            if (subscriber == null)
            {
                // After the address is resolved routing key will not be null.
                String topicName = dest.getRoutingKey();

                if (_strictAMQP)
                {
                    if (_strictAMQPFATAL)
                    {
                        throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
                    }
                    else
                    {
                        _logger.warn("Unable to determine if subscription already exists for '" + topicName
                                        + "' for creation durableSubscriber. Requesting queue deletion regardless.");
                    }

                    deleteQueue(dest.getAMQQueueName());
                }
                else
                {
                    Map<String,Object> args = new HashMap<String,Object>();

                    // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a
                    // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise
                    // possible to determine  when querying the broker whether there are no arguments or just a non-matching selector
                    // argument, as specifying null for the arguments when querying means they should not be checked at all
                    args.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
                    if(noLocal)
                    {
                        args.put(AMQPFilterTypes.NO_LOCAL.getValue(), true);
                    }

                    // if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec
                    // says we must trash the subscription.
                    boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName());
                    boolean isQueueBoundForTopicAndSelector =
                                isQueueBound(dest.getExchangeName(),
                                             dest.getAMQQueueName(),
                                             topicName, args);

                    if (isQueueBound && !isQueueBoundForTopicAndSelector)
                    {
                        deleteQueue(dest.getAMQQueueName());
                    }
                    else if(isQueueBound) // todo - this is a hack for 0-8/9/9-1 which cannot check if arguments on a binding match
                    {
                        try
                        {
                            bindQueue(dest.getAMQQueueName(),
                                      dest.getRoutingKey(),
                                      args,
                                      dest.getExchangeName(),
                                      dest,
                                      true);
                        }
                        catch(QpidException e)
                        {
                            throw toJMSException("Error when checking binding",e);
                        }
                    }
                }
            }
            else
            {
                // Subscribed with the same topic and no current / previous or same selector
                if (subscriber.getTopic().equals(topic)
                    && ((messageSelector == null && subscriber.getMessageSelector() == null)
                            || (messageSelector != null && messageSelector.equals(subscriber.getMessageSelector()))))
                {
                    throw new IllegalStateException("Already subscribed to topic " + topic + " with subscription name " + name
                            + (messageSelector != null ? " and selector " + messageSelector : ""));
                }
                else
                {
                    unsubscribe(name, true);
                }

            }

            _subscriberAccess.lock();
            try
            {
                C consumer = (C) createConsumer(dest, messageSelector, noLocal);
                consumer.markAsDurableSubscriber();
                subscriber = new TopicSubscriberAdaptor<C>(dest, consumer);

                // Save subscription information
                _subscriptions.put(name, subscriber);
                _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
            }
            finally
            {
                _subscriberAccess.unlock();
            }

            return subscriber;
        }
        catch (TransportException e)
        {
            throw toJMSException("Exception while creating durable subscriber:" + e.getMessage(), e);
        }
        finally
        {
            _subscriberDetails.unlock();
        }
    }