in core/src/main/java/flex/messaging/services/messaging/adapters/JMSAdapter.java [400:484]
public Object manage(CommandMessage commandMessage) {
JMSConsumer consumer = null;
Object clientId = commandMessage.getClientId();
if (commandMessage.getOperation() == CommandMessage.SUBSCRIBE_OPERATION) {
// Keep track of the selector expression.
Object selectorExpression = commandMessage.getHeaders().get(CommandMessage.SELECTOR_HEADER);
// Create a JMSConsumer for this destination and associate it with the client id
if (settings.getDestinationType().equals(TOPIC)) {
MessageClient existingMessageClient = null;
// This could happen when client disconnects without unsubscribing first.
if (topicConsumers.containsKey(clientId)) {
removeConsumer(clientId, true /*unsubscribe*/, false /*invalidate*/, null);
existingMessageClient = messageClients.get(clientId);
}
// Create the consumer.
consumer = new JMSTopicConsumer();
consumer.initialize(settings);
if (selectorExpression != null)
consumer.setSelectorExpression((String) selectorExpression);
// Need to build a subscription name, in case durable subscriptions are used.
((JMSTopicConsumer) consumer).setDurableSubscriptionName(buildSubscriptionName(clientId));
consumer.setMessageReceiver(buildMessageReceiver(consumer));
// Add JMSAdapter as JMS exception and message listener.
consumer.addJMSExceptionListener(this);
consumer.addJMSMessageListener(this);
topicConsumers.put(clientId, consumer);
consumerToClientId.put(consumer, clientId);
// Means client was disconnected without unsubscribing, hence no
// new message client will be created. Make sure the old one is
// wired up with the new JMS consumer properly.
if (existingMessageClient != null)
messageClientCreated(existingMessageClient);
} else if (settings.getDestinationType().equals(QUEUE)) {
MessageClient existingMessageClient = null;
if (queueConsumers.containsKey(clientId)) {
removeConsumer(clientId, true /*unsubscribe*/, false /*invalidate*/, null);
existingMessageClient = messageClients.get(clientId);
}
// Create the consumer.
consumer = new JMSQueueConsumer();
consumer.initialize(settings);
if (selectorExpression != null)
consumer.setSelectorExpression((String) selectorExpression);
consumer.setMessageReceiver(buildMessageReceiver(consumer));
// Add JMSAdapter as JMS exception and message listener.
consumer.addJMSExceptionListener(this);
consumer.addJMSMessageListener(this);
queueConsumers.put(clientId, consumer);
consumerToClientId.put(consumer, clientId);
// Means client was disconnected without unsubscribing, hence no
// new message client will be created. Make sure the old one is
// wired up with the new JMS consumer properly.
if (existingMessageClient != null)
messageClientCreated(existingMessageClient);
}
} else if (commandMessage.getOperation() == CommandMessage.UNSUBSCRIBE_OPERATION) {
// Determines if the durable subscription should be unsubscribed
// when the JMS consumer is removed.
boolean unsubscribe = true;
boolean preserveDurable = false;
if (commandMessage.getHeader(CommandMessage.PRESERVE_DURABLE_HEADER) != null)
preserveDurable = ((Boolean) (commandMessage.getHeader(CommandMessage.PRESERVE_DURABLE_HEADER))).booleanValue();
// Don't destroy a durable subscription if the MessageClient's session has been invalidated.
// or this is a JMS durable connection that has requested to be undestroyed
if (commandMessage.getHeader(CommandMessage.SUBSCRIPTION_INVALIDATED_HEADER) != null
&& ((Boolean) commandMessage.getHeader(CommandMessage.SUBSCRIPTION_INVALIDATED_HEADER)).booleanValue()
|| preserveDurable)
unsubscribe = false;
removeConsumer(clientId, unsubscribe, false, null);
}
// CommandMessage.POLL_OPERATION handling is left to the Endpoint
// hence not handled by this adapter.
return null;
}