public void start()

in core/src/main/java/flex/messaging/services/messaging/adapters/JMSTopicConsumer.java [81:160]


    public void start() throws NamingException, JMSException {
        super.start();

        // Establish topic
        Topic topic;
        try {
            topic = (Topic) destination;
        } catch (ClassCastException cce) {
            // JMS topic proxy for JMS destination ''{0}'' has a destination type of ''{1}'' which is not Topic.
            MessageException me = new MessageException();
            me.setMessage(JMSConfigConstants.NON_TOPIC_DESTINATION, new Object[]{destinationJndiName, destination.getClass().getName()});
            throw me;
        }

        // Create connection
        TopicConnectionFactory topicFactory;
        try {
            topicFactory = (TopicConnectionFactory) connectionFactory;
            if (connectionCredentials != null)
                connection = topicFactory.createTopicConnection(connectionCredentials.getUsername(), connectionCredentials.getPassword());
            else
                connection = topicFactory.createTopicConnection();
        } catch (ClassCastException cce) {
            // JMS topic proxy for JMS destination ''{0}'' has a connection factory type of ''{1}'' which is not TopicConnectionFactory.
            MessageException me = new MessageException();
            me.setMessage(JMSConfigConstants.NON_TOPIC_FACTORY, new Object[]{destinationJndiName, connectionFactory.getClass().getName()});
            throw me;
        }

        TopicConnection topicConnection = (TopicConnection) connection;

        if (durableConsumers) {
            try {
                if (Log.isDebug())
                    Log.getLogger(JMSAdapter.LOG_CATEGORY).debug("JMS consumer for JMS destination '"
                            + destinationJndiName + "' is setting its underlying connection's client id to "
                            + durableSubscriptionName + " for durable subscription.");

                topicConnection.setClientID(durableSubscriptionName);
            } catch (Exception e) {
                // Try to set the clientID in a seperate thread.
                ExecutorService clientIdSetter = Executors.newSingleThreadExecutor();
                ClientIdSetterCallable cisc = new ClientIdSetterCallable(topicFactory, durableSubscriptionName);
                Future future = clientIdSetter.submit(cisc);
                try {
                    topicConnection = (TopicConnection) future.get();
                } catch (InterruptedException ie) {
                    if (Log.isWarn())
                        Log.getLogger(JMSAdapter.LOG_CATEGORY).warn("The proxied durable JMS subscription with name, "
                                + durableSubscriptionName + " could not set its client id "
                                + "on the topic connection because it was interrupted: "
                                + ie.toString());
                } catch (ExecutionException ee) {
                    // JMS topic consumer for JMS destination ''{0}'' is configured to use durable subscriptions but the application server does not permit javax.jms.Connection.setClientID method needed to support durable subscribers. Set durable property to false.
                    MessageException me = new MessageException();
                    me.setMessage(JMSConfigConstants.DURABLE_SUBSCRIBER_NOT_SUPPORTED, new Object[]{destinationJndiName});
                    throw me;
                }
            }
        }

        // Create topic session on the connection
        session = topicConnection.createTopicSession(false, getAcknowledgeMode());
        TopicSession topicSession = (TopicSession) session;

        // Create subscriber on topic session, handling message selectors and durable subscribers
        if (selectorExpression != null) {
            if (durableConsumers && durableSubscriptionName != null)
                consumer = topicSession.createDurableSubscriber(topic, durableSubscriptionName, selectorExpression, false);
            else
                consumer = topicSession.createSubscriber(topic, selectorExpression, false);
        } else {
            if (durableConsumers && durableSubscriptionName != null)
                consumer = topicSession.createDurableSubscriber(topic, durableSubscriptionName);
            else
                consumer = topicSession.createSubscriber(topic);
        }

        startMessageReceiver();
    }