in core/src/main/java/flex/messaging/services/MessageService.java [801:1008]
protected Message manageSubscriptions(CommandMessage command) {
Message replyMessage = null;
MessageDestination destination = (MessageDestination) getDestination(command);
SubscriptionManager subscriptionManager = destination.getSubscriptionManager();
Object clientId = command.getClientId();
String endpointId = (String) command.getHeader(Message.ENDPOINT_HEADER);
String subtopicString = (String) command.getHeader(AsyncMessage.SUBTOPIC_HEADER_NAME);
ServiceAdapter adapter = destination.getAdapter();
if (command.getOperation() == CommandMessage.SUBSCRIBE_OPERATION) {
String selectorExpr = (String) command.getHeader(CommandMessage.SELECTOR_HEADER);
getMessageBroker().inspectChannel(command, destination);
// Give MessagingAdapter a chance to block the subscribe.
if ((adapter instanceof MessagingAdapter)) {
MessagingSecurityConstraintManager manager = ((MessagingAdapter) adapter).getSecurityConstraintManager();
if (manager != null)
manager.assertSubscribeAuthorization();
}
try {
/*
* This allows parallel add/remove subscribe calls (protected by the
* concurrent hash table) but prevents us from doing any table mods
* when the getSubscriptionState method is active
*/
subscribeLock.readLock().lock();
if (adapter.handlesSubscriptions()) {
replyMessage = (Message) adapter.manage(command);
} else {
testSelector(selectorExpr, command);
}
/*
* Even if the adapter is managing the subscription, we still need to
* register this with the subscription manager so that we can match the
* endpoint with the clientId. I am not sure I like this though because
* now the subscription is registered both with the adapter and with our
* system so keeping them in sync is potentially problematic. Also, it
* seems like the adapter should have the option to manage endpoints themselves?
*/
// Extract the maxFrequency that might have been specified by the client.
int maxFrequency = processMaxFrequencyHeader(command);
subscriptionManager.addSubscriber(clientId, selectorExpr, subtopicString, endpointId, maxFrequency);
} finally {
subscribeLock.readLock().unlock();
}
if (replyMessage == null)
replyMessage = new AcknowledgeMessage();
} else if (command.getOperation() == CommandMessage.UNSUBSCRIBE_OPERATION) {
// Give MessagingAdapter a chance to block the unsubscribe, as long as
// the subscription has not been invalidated.
if ((adapter instanceof MessagingAdapter) && command.getHeader(CommandMessage.SUBSCRIPTION_INVALIDATED_HEADER) == null) {
MessagingSecurityConstraintManager manager = ((MessagingAdapter) adapter).getSecurityConstraintManager();
if (manager != null)
manager.assertSubscribeAuthorization();
}
String selectorExpr = (String) command.getHeader(CommandMessage.SELECTOR_HEADER);
try {
subscribeLock.readLock().lock();
if (adapter.handlesSubscriptions()) {
replyMessage = (Message) adapter.manage(command);
}
subscriptionManager.removeSubscriber(clientId, selectorExpr, subtopicString, endpointId);
} finally {
subscribeLock.readLock().unlock();
}
if (replyMessage == null)
replyMessage = new AcknowledgeMessage();
} else if (command.getOperation() == CommandMessage.MULTI_SUBSCRIBE_OPERATION) {
getMessageBroker().inspectChannel(command, destination);
// Give MessagingAdapter a chance to block the multi subscribe.
if ((adapter instanceof MessagingAdapter)) {
MessagingSecurityConstraintManager manager = ((MessagingAdapter) adapter).getSecurityConstraintManager();
if (manager != null)
manager.assertSubscribeAuthorization();
}
try {
/*
* This allows parallel add/remove subscribe calls (protected by the
* concurrent hash table) but prevents us from doing any table mods
* when the getSubscriptionState method is active
*/
subscribeLock.readLock().lock();
if (adapter.handlesSubscriptions()) {
replyMessage = (Message) adapter.manage(command);
}
// Deals with legacy collection setting
Object[] adds = getObjectArrayFromHeader(command.getHeader(CommandMessage.ADD_SUBSCRIPTIONS));
Object[] rems = getObjectArrayFromHeader(command.getHeader(CommandMessage.REMOVE_SUBSCRIPTIONS));
if (adds != null) {
// Extract the maxFrequency that might have been specified
// by the client for every subscription (selector/subtopic).
int maxFrequency = processMaxFrequencyHeader(command);
for (int i = 0; i < adds.length; i++) {
// Use the maxFrequency by default.
int maxFrequencyPerSubscription = maxFrequency;
String ss = (String) adds[i];
int ix = ss.indexOf(CommandMessage.SUBTOPIC_SEPARATOR);
if (ix != -1) {
String subtopic = (ix == 0 ? null : ss.substring(0, ix));
String selector = null;
String selectorAndMaxFrequency = ss.substring(ix + CommandMessage.SUBTOPIC_SEPARATOR.length());
if (selectorAndMaxFrequency.length() != 0) {
int ix2 = selectorAndMaxFrequency.indexOf(CommandMessage.SUBTOPIC_SEPARATOR);
if (ix2 != -1) {
selector = (ix2 == 0 ? null : selectorAndMaxFrequency.substring(0, ix2));
String maxFrequencyString = selectorAndMaxFrequency.substring(ix2 + CommandMessage.SUBTOPIC_SEPARATOR.length());
if (maxFrequencyString.length() != 0) {
// Choose the minimum of Consumer level maxFrequency and subscription level maxFrequency.
int maxFrequencyCandidate = Integer.parseInt(maxFrequencyString);
maxFrequencyPerSubscription = maxFrequencyPerSubscription == 0 ? maxFrequencyCandidate : Math.min(maxFrequencyPerSubscription, maxFrequencyCandidate);
}
} else {
selector = selectorAndMaxFrequency;
}
}
subscriptionManager.addSubscriber(clientId, selector, subtopic, endpointId, maxFrequencyPerSubscription);
}
// invalid message
}
}
if (rems != null) {
for (int i = 0; i < rems.length; i++) {
String ss = (String) rems[i];
int ix = ss.indexOf(CommandMessage.SUBTOPIC_SEPARATOR);
if (ix != -1) {
String subtopic = (ix == 0 ? null : ss.substring(0, ix));
String selector = null;
String selectorAndMaxFrequency = ss.substring(ix + CommandMessage.SUBTOPIC_SEPARATOR.length());
if (selectorAndMaxFrequency.length() != 0) {
int ix2 = selectorAndMaxFrequency.indexOf(CommandMessage.SUBTOPIC_SEPARATOR);
if (ix2 != -1)
selector = ix2 == 0 ? null : selectorAndMaxFrequency.substring(0, ix2);
else
selector = selectorAndMaxFrequency;
}
subscriptionManager.removeSubscriber(clientId, selector, subtopic, endpointId);
}
}
}
} finally {
subscribeLock.readLock().unlock();
}
if (replyMessage == null)
replyMessage = new AcknowledgeMessage();
} else if (command.getOperation() == CommandMessage.POLL_OPERATION) {
// This code path handles poll messages sent by Consumer.receive().
// This API should not trigger server side waits, so we invoke poll
// and if there are no queued messages for this Consumer instance we
// return an empty acknowledgement immediately.
MessageClient client = null;
try {
client = subscriptionManager.getMessageClient(clientId, endpointId);
if (client != null) {
if (adapter.handlesSubscriptions()) {
List missedMessages = (List) adapter.manage(command);
if (missedMessages != null && !missedMessages.isEmpty()) {
MessageBroker broker = getMessageBroker();
for (Iterator iter = missedMessages.iterator(); iter.hasNext(); )
broker.routeMessageToMessageClient((Message) iter.next(), client);
}
}
FlushResult flushResult = client.getFlexClient().poll(client);
List messagesToReturn = (flushResult != null) ? flushResult.getMessages() : null;
if (messagesToReturn != null && !messagesToReturn.isEmpty()) {
replyMessage = new CommandMessage(CommandMessage.CLIENT_SYNC_OPERATION);
replyMessage.setBody(messagesToReturn.toArray());
} else {
replyMessage = new AcknowledgeMessage();
}
// Adaptive poll wait is never used in responses to Consumer.receive() calls.
} else {
ServiceException se = new ServiceException();
se.setCode(NOT_SUBSCRIBED_CODE);
se.setMessage(NOT_SUBSCRIBED, new Object[]{destination.getId()});
throw se;
}
} finally {
subscriptionManager.releaseMessageClient(client);
}
} else {
ServiceException se = new ServiceException();
se.setMessage(UNKNOWN_COMMAND, new Object[]{new Integer(command.getOperation())});
throw se;
}
return replyMessage;
}