in core/src/main/java/flex/messaging/services/messaging/adapters/JMSTopicConsumer.java [81:160]
public void start() throws NamingException, JMSException {
super.start();
// Establish topic
Topic topic;
try {
topic = (Topic) destination;
} catch (ClassCastException cce) {
// JMS topic proxy for JMS destination ''{0}'' has a destination type of ''{1}'' which is not Topic.
MessageException me = new MessageException();
me.setMessage(JMSConfigConstants.NON_TOPIC_DESTINATION, new Object[]{destinationJndiName, destination.getClass().getName()});
throw me;
}
// Create connection
TopicConnectionFactory topicFactory;
try {
topicFactory = (TopicConnectionFactory) connectionFactory;
if (connectionCredentials != null)
connection = topicFactory.createTopicConnection(connectionCredentials.getUsername(), connectionCredentials.getPassword());
else
connection = topicFactory.createTopicConnection();
} catch (ClassCastException cce) {
// JMS topic proxy for JMS destination ''{0}'' has a connection factory type of ''{1}'' which is not TopicConnectionFactory.
MessageException me = new MessageException();
me.setMessage(JMSConfigConstants.NON_TOPIC_FACTORY, new Object[]{destinationJndiName, connectionFactory.getClass().getName()});
throw me;
}
TopicConnection topicConnection = (TopicConnection) connection;
if (durableConsumers) {
try {
if (Log.isDebug())
Log.getLogger(JMSAdapter.LOG_CATEGORY).debug("JMS consumer for JMS destination '"
+ destinationJndiName + "' is setting its underlying connection's client id to "
+ durableSubscriptionName + " for durable subscription.");
topicConnection.setClientID(durableSubscriptionName);
} catch (Exception e) {
// Try to set the clientID in a seperate thread.
ExecutorService clientIdSetter = Executors.newSingleThreadExecutor();
ClientIdSetterCallable cisc = new ClientIdSetterCallable(topicFactory, durableSubscriptionName);
Future future = clientIdSetter.submit(cisc);
try {
topicConnection = (TopicConnection) future.get();
} catch (InterruptedException ie) {
if (Log.isWarn())
Log.getLogger(JMSAdapter.LOG_CATEGORY).warn("The proxied durable JMS subscription with name, "
+ durableSubscriptionName + " could not set its client id "
+ "on the topic connection because it was interrupted: "
+ ie.toString());
} catch (ExecutionException ee) {
// JMS topic consumer for JMS destination ''{0}'' is configured to use durable subscriptions but the application server does not permit javax.jms.Connection.setClientID method needed to support durable subscribers. Set durable property to false.
MessageException me = new MessageException();
me.setMessage(JMSConfigConstants.DURABLE_SUBSCRIBER_NOT_SUPPORTED, new Object[]{destinationJndiName});
throw me;
}
}
}
// Create topic session on the connection
session = topicConnection.createTopicSession(false, getAcknowledgeMode());
TopicSession topicSession = (TopicSession) session;
// Create subscriber on topic session, handling message selectors and durable subscribers
if (selectorExpression != null) {
if (durableConsumers && durableSubscriptionName != null)
consumer = topicSession.createDurableSubscriber(topic, durableSubscriptionName, selectorExpression, false);
else
consumer = topicSession.createSubscriber(topic, selectorExpression, false);
} else {
if (durableConsumers && durableSubscriptionName != null)
consumer = topicSession.createDurableSubscriber(topic, durableSubscriptionName);
else
consumer = topicSession.createSubscriber(topic);
}
startMessageReceiver();
}