in core/src/main/java/flex/messaging/services/messaging/SubscriptionManager.java [476:609]
public void addSubscriber(Object clientId, String selector, String subtopicString, String endpointId, int maxFrequency) {
Subtopic subtopic = getSubtopic(subtopicString);
MessageClient client = null;
TopicSubscription topicSub;
Map<Object, MessageClient> subs;
Map<Subtopic, TopicSubscription> map;
try {
// Handle resubscribes from the same client and duplicate subscribes from different clients
boolean subscriptionAlreadyExists = (getSubscriber(clientId) != null);
client = getMessageClient(clientId, endpointId);
FlexClient flexClient = FlexContext.getFlexClient();
if (subscriptionAlreadyExists) {
// Block duplicate subscriptions from multiple FlexClients if they
// attempt to use the same clientId. (when this is called from a remote
// subscription, there won't be a flex client so skip this test).
if (flexClient != null && !flexClient.getId().equals(client.getFlexClient().getId())) {
ServiceException se = new ServiceException();
se.setMessage(10559, new Object[]{clientId});
throw se;
}
// It's a resubscribe. Reset the endpoint push state for the subscription to make sure its current
// because a resubscribe could be arriving over a new endpoint or a new session.
client.resetEndpoint(endpointId);
}
ServiceAdapter adapter = destination.getAdapter();
client.updateLastUse();
if (subtopic == null) {
topicSub = globalSubscribers;
} else {
if (!destination.getServerSettings().getAllowSubtopics()) {
// Throw an error - the destination doesn't allow subtopics.
ServiceException se = new ServiceException();
se.setMessage(SUBTOPICS_NOT_SUPPORTED, new Object[]{subtopicString, destination.getId()});
throw se;
}
if (subtopic.containsSubtopicWildcard() && destination.getServerSettings().isDisallowWildcardSubtopics()) {
// Attempt to subscribe to the subtopic, ''{0}'', on destination, ''{1}'', that does not allow wilcard subtopics failed.
ServiceException se = new ServiceException();
se.setMessage(WILDCARD_SUBTOPICS_NOT_ALLOWED, new Object[]{subtopicString, destination.getId()});
throw se;
}
// Give a MessagingAdapter a chance to block the subscribe.
if ((adapter instanceof MessagingSecurity) && (subtopic != null)) {
if (!((MessagingSecurity) adapter).allowSubscribe(subtopic)) {
ServiceException se = new ServiceException();
se.setMessage(10557, new Object[]{subtopicString});
throw se;
}
}
/*
* If there is a wildcard, we always need to match that subscription
* against the producer. If it has no wildcard, we can do a quick
* lookup to find the subscribers.
*/
if (subtopic.containsSubtopicWildcard())
map = subscribersPerSubtopicWildcard;
else
map = subscribersPerSubtopic;
synchronized (this) {
topicSub = map.get(subtopic);
if (topicSub == null) {
topicSub = new TopicSubscription();
map.put(subtopic, topicSub);
}
}
}
/* Subscribing with no selector */
if (selector == null) {
subs = topicSub.defaultSubscriptions;
if (subs == null) {
synchronized (this) {
if ((subs = topicSub.defaultSubscriptions) == null)
topicSub.defaultSubscriptions = subs = new ConcurrentHashMap<Object, MessageClient>();
}
}
}
/* Subscribing with a selector - store all subscriptions under the selector key */
else {
synchronized (this) {
if (topicSub.selectorSubscriptions == null)
topicSub.selectorSubscriptions = new ConcurrentHashMap<String, Map<Object, MessageClient>>();
}
subs = topicSub.selectorSubscriptions.get(selector);
if (subs == null) {
synchronized (this) {
if ((subs = topicSub.selectorSubscriptions.get(selector)) == null)
topicSub.selectorSubscriptions.put(selector, subs = new ConcurrentHashMap<Object, MessageClient>());
}
}
}
if (subs.containsKey(clientId)) {
/* I'd rather this be an error but in 2.0 we allowed this without error */
if (Log.isWarn())
Log.getLogger(LogCategories.MESSAGE_SELECTOR).warn("Client: " + clientId + " already subscribed to: " + destination.getId() + " selector: " + selector + " subtopic: " + subtopicString);
} else {
client.addSubscription(selector, subtopicString, maxFrequency);
synchronized (this) {
/*
* Make sure other members of the cluster know that we are subscribed to
* this info if we are in server-to-server mode
*
* This has to be done in the synchronized section so that we properly
* order subscribe and unsubscribe messages for our peers so their
* subscription state matches the one in the local server.
*/
if (subs.isEmpty() && destination.isClustered() &&
destination.getServerSettings().getRoutingMode() == RoutingMode.SERVER_TO_SERVER)
sendSubscriptionToPeer(true, selector, subtopicString);
subs.put(clientId, client);
}
monitorTimeout(client); // local operation, timeouts on remote host are not started until failover
// Finally, if a new MessageClient was created, notify its created
// listeners now that MessageClient's subscription state is setup.
if (!subscriptionAlreadyExists)
client.notifyCreatedListeners();
}
} finally {
releaseMessageClient(client);
}
}