public void addSubscriber()

in core/src/main/java/flex/messaging/services/messaging/SubscriptionManager.java [476:609]


    public void addSubscriber(Object clientId, String selector, String subtopicString, String endpointId, int maxFrequency) {
        Subtopic subtopic = getSubtopic(subtopicString);
        MessageClient client = null;
        TopicSubscription topicSub;
        Map<Object, MessageClient> subs;
        Map<Subtopic, TopicSubscription> map;

        try {
            // Handle resubscribes from the same client and duplicate subscribes from different clients
            boolean subscriptionAlreadyExists = (getSubscriber(clientId) != null);
            client = getMessageClient(clientId, endpointId);

            FlexClient flexClient = FlexContext.getFlexClient();
            if (subscriptionAlreadyExists) {
                // Block duplicate subscriptions from multiple FlexClients if they
                // attempt to use the same clientId.  (when this is called from a remote
                // subscription, there won't be a flex client so skip this test).
                if (flexClient != null && !flexClient.getId().equals(client.getFlexClient().getId())) {
                    ServiceException se = new ServiceException();
                    se.setMessage(10559, new Object[]{clientId});
                    throw se;
                }

                // It's a resubscribe. Reset the endpoint push state for the subscription to make sure its current
                // because a resubscribe could be arriving over a new endpoint or a new session.
                client.resetEndpoint(endpointId);
            }

            ServiceAdapter adapter = destination.getAdapter();
            client.updateLastUse();

            if (subtopic == null) {
                topicSub = globalSubscribers;
            } else {
                if (!destination.getServerSettings().getAllowSubtopics()) {
                    // Throw an error - the destination doesn't allow subtopics.
                    ServiceException se = new ServiceException();
                    se.setMessage(SUBTOPICS_NOT_SUPPORTED, new Object[]{subtopicString, destination.getId()});
                    throw se;
                }

                if (subtopic.containsSubtopicWildcard() && destination.getServerSettings().isDisallowWildcardSubtopics()) {
                    // Attempt to subscribe to the subtopic, ''{0}'', on destination, ''{1}'', that does not allow wilcard subtopics failed.
                    ServiceException se = new ServiceException();
                    se.setMessage(WILDCARD_SUBTOPICS_NOT_ALLOWED, new Object[]{subtopicString, destination.getId()});
                    throw se;
                }

                // Give a MessagingAdapter a chance to block the subscribe.
                if ((adapter instanceof MessagingSecurity) && (subtopic != null)) {
                    if (!((MessagingSecurity) adapter).allowSubscribe(subtopic)) {
                        ServiceException se = new ServiceException();
                        se.setMessage(10557, new Object[]{subtopicString});
                        throw se;
                    }
                }

                /*
                 * If there is a wildcard, we always need to match that subscription
                 * against the producer.  If it has no wildcard, we can do a quick
                 * lookup to find the subscribers.
                 */
                if (subtopic.containsSubtopicWildcard())
                    map = subscribersPerSubtopicWildcard;
                else
                    map = subscribersPerSubtopic;

                synchronized (this) {
                    topicSub = map.get(subtopic);
                    if (topicSub == null) {
                        topicSub = new TopicSubscription();
                        map.put(subtopic, topicSub);
                    }
                }
            }

            /* Subscribing with no selector */
            if (selector == null) {
                subs = topicSub.defaultSubscriptions;
                if (subs == null) {
                    synchronized (this) {
                        if ((subs = topicSub.defaultSubscriptions) == null)
                            topicSub.defaultSubscriptions = subs = new ConcurrentHashMap<Object, MessageClient>();
                    }
                }
            }
            /* Subscribing with a selector - store all subscriptions under the selector key */
            else {
                synchronized (this) {
                    if (topicSub.selectorSubscriptions == null)
                        topicSub.selectorSubscriptions = new ConcurrentHashMap<String, Map<Object, MessageClient>>();
                }

                subs = topicSub.selectorSubscriptions.get(selector);
                if (subs == null) {
                    synchronized (this) {
                        if ((subs = topicSub.selectorSubscriptions.get(selector)) == null)
                            topicSub.selectorSubscriptions.put(selector, subs = new ConcurrentHashMap<Object, MessageClient>());
                    }
                }
            }

            if (subs.containsKey(clientId)) {
                /* I'd rather this be an error but in 2.0 we allowed this without error */
                if (Log.isWarn())
                    Log.getLogger(LogCategories.MESSAGE_SELECTOR).warn("Client: " + clientId + " already subscribed to: " + destination.getId() + " selector: " + selector + " subtopic: " + subtopicString);
            } else {
                client.addSubscription(selector, subtopicString, maxFrequency);
                synchronized (this) {
                    /*
                     * Make sure other members of the cluster know that we are subscribed to
                     * this info if we are in server-to-server mode
                     *
                     * This has to be done in the synchronized section so that we properly
                     * order subscribe and unsubscribe messages for our peers so their
                     * subscription state matches the one in the local server.
                     */
                    if (subs.isEmpty() && destination.isClustered() &&
                            destination.getServerSettings().getRoutingMode() == RoutingMode.SERVER_TO_SERVER)
                        sendSubscriptionToPeer(true, selector, subtopicString);
                    subs.put(clientId, client);
                }
                monitorTimeout(client); // local operation, timeouts on remote host are not started until failover

                // Finally, if a new MessageClient was created, notify its created
                // listeners now that MessageClient's subscription state is setup.
                if (!subscriptionAlreadyExists)
                    client.notifyCreatedListeners();
            }
        } finally {
            releaseMessageClient(client);
        }

    }