protected Message manageSubscriptions()

in core/src/main/java/flex/messaging/services/MessageService.java [801:1008]


    protected Message manageSubscriptions(CommandMessage command) {
        Message replyMessage = null;

        MessageDestination destination = (MessageDestination) getDestination(command);
        SubscriptionManager subscriptionManager = destination.getSubscriptionManager();

        Object clientId = command.getClientId();
        String endpointId = (String) command.getHeader(Message.ENDPOINT_HEADER);

        String subtopicString = (String) command.getHeader(AsyncMessage.SUBTOPIC_HEADER_NAME);

        ServiceAdapter adapter = destination.getAdapter();

        if (command.getOperation() == CommandMessage.SUBSCRIBE_OPERATION) {
            String selectorExpr = (String) command.getHeader(CommandMessage.SELECTOR_HEADER);

            getMessageBroker().inspectChannel(command, destination);

            // Give MessagingAdapter a chance to block the subscribe.
            if ((adapter instanceof MessagingAdapter)) {
                MessagingSecurityConstraintManager manager = ((MessagingAdapter) adapter).getSecurityConstraintManager();
                if (manager != null)
                    manager.assertSubscribeAuthorization();
            }

            try {
                /*
                 * This allows parallel add/remove subscribe calls (protected by the
                 * concurrent hash table) but prevents us from doing any table mods
                 * when the getSubscriptionState method is active
                 */
                subscribeLock.readLock().lock();

                if (adapter.handlesSubscriptions()) {
                    replyMessage = (Message) adapter.manage(command);
                } else {
                    testSelector(selectorExpr, command);
                }
                /*
                 * Even if the adapter is managing the subscription, we still need to
                 * register this with the subscription manager so that we can match the
                 * endpoint with the clientId.  I am not sure I like this though because
                 * now the subscription is registered both with the adapter and with our
                 * system so keeping them in sync is potentially problematic.   Also, it
                 * seems like the adapter should have the option to manage endpoints themselves?
                 */

                // Extract the maxFrequency that might have been specified by the client.
                int maxFrequency = processMaxFrequencyHeader(command);
                subscriptionManager.addSubscriber(clientId, selectorExpr, subtopicString, endpointId, maxFrequency);
            } finally {
                subscribeLock.readLock().unlock();
            }

            if (replyMessage == null)
                replyMessage = new AcknowledgeMessage();
        } else if (command.getOperation() == CommandMessage.UNSUBSCRIBE_OPERATION) {
            // Give MessagingAdapter a chance to block the unsubscribe, as long as
            // the subscription has not been invalidated.
            if ((adapter instanceof MessagingAdapter) && command.getHeader(CommandMessage.SUBSCRIPTION_INVALIDATED_HEADER) == null) {
                MessagingSecurityConstraintManager manager = ((MessagingAdapter) adapter).getSecurityConstraintManager();
                if (manager != null)
                    manager.assertSubscribeAuthorization();
            }

            String selectorExpr = (String) command.getHeader(CommandMessage.SELECTOR_HEADER);

            try {
                subscribeLock.readLock().lock();

                if (adapter.handlesSubscriptions()) {
                    replyMessage = (Message) adapter.manage(command);
                }
                subscriptionManager.removeSubscriber(clientId, selectorExpr, subtopicString, endpointId);
            } finally {
                subscribeLock.readLock().unlock();
            }

            if (replyMessage == null)
                replyMessage = new AcknowledgeMessage();
        } else if (command.getOperation() == CommandMessage.MULTI_SUBSCRIBE_OPERATION) {
            getMessageBroker().inspectChannel(command, destination);

            // Give MessagingAdapter a chance to block the multi subscribe.
            if ((adapter instanceof MessagingAdapter)) {
                MessagingSecurityConstraintManager manager = ((MessagingAdapter) adapter).getSecurityConstraintManager();
                if (manager != null)
                    manager.assertSubscribeAuthorization();
            }

            try {
                /*
                 * This allows parallel add/remove subscribe calls (protected by the
                 * concurrent hash table) but prevents us from doing any table mods
                 * when the getSubscriptionState method is active
                 */
                subscribeLock.readLock().lock();

                if (adapter.handlesSubscriptions()) {
                    replyMessage = (Message) adapter.manage(command);
                }

                // Deals with legacy collection setting
                Object[] adds = getObjectArrayFromHeader(command.getHeader(CommandMessage.ADD_SUBSCRIPTIONS));
                Object[] rems = getObjectArrayFromHeader(command.getHeader(CommandMessage.REMOVE_SUBSCRIPTIONS));

                if (adds != null) {
                    // Extract the maxFrequency that might have been specified
                    // by the client for every subscription (selector/subtopic).
                    int maxFrequency = processMaxFrequencyHeader(command);
                    for (int i = 0; i < adds.length; i++) {
                        // Use the maxFrequency by default.
                        int maxFrequencyPerSubscription = maxFrequency;
                        String ss = (String) adds[i];
                        int ix = ss.indexOf(CommandMessage.SUBTOPIC_SEPARATOR);
                        if (ix != -1) {
                            String subtopic = (ix == 0 ? null : ss.substring(0, ix));
                            String selector = null;
                            String selectorAndMaxFrequency = ss.substring(ix + CommandMessage.SUBTOPIC_SEPARATOR.length());
                            if (selectorAndMaxFrequency.length() != 0) {
                                int ix2 = selectorAndMaxFrequency.indexOf(CommandMessage.SUBTOPIC_SEPARATOR);
                                if (ix2 != -1) {
                                    selector = (ix2 == 0 ? null : selectorAndMaxFrequency.substring(0, ix2));
                                    String maxFrequencyString = selectorAndMaxFrequency.substring(ix2 + CommandMessage.SUBTOPIC_SEPARATOR.length());
                                    if (maxFrequencyString.length() != 0) {
                                        // Choose the minimum of Consumer level maxFrequency and subscription level maxFrequency.
                                        int maxFrequencyCandidate = Integer.parseInt(maxFrequencyString);
                                        maxFrequencyPerSubscription = maxFrequencyPerSubscription == 0 ? maxFrequencyCandidate : Math.min(maxFrequencyPerSubscription, maxFrequencyCandidate);
                                    }
                                } else {
                                    selector = selectorAndMaxFrequency;
                                }
                            }
                            subscriptionManager.addSubscriber(clientId, selector, subtopic, endpointId, maxFrequencyPerSubscription);
                        }
                        // invalid message
                    }
                }

                if (rems != null) {
                    for (int i = 0; i < rems.length; i++) {
                        String ss = (String) rems[i];
                        int ix = ss.indexOf(CommandMessage.SUBTOPIC_SEPARATOR);
                        if (ix != -1) {
                            String subtopic = (ix == 0 ? null : ss.substring(0, ix));
                            String selector = null;
                            String selectorAndMaxFrequency = ss.substring(ix + CommandMessage.SUBTOPIC_SEPARATOR.length());
                            if (selectorAndMaxFrequency.length() != 0) {
                                int ix2 = selectorAndMaxFrequency.indexOf(CommandMessage.SUBTOPIC_SEPARATOR);
                                if (ix2 != -1)
                                    selector = ix2 == 0 ? null : selectorAndMaxFrequency.substring(0, ix2);
                                else
                                    selector = selectorAndMaxFrequency;
                            }
                            subscriptionManager.removeSubscriber(clientId, selector, subtopic, endpointId);
                        }
                    }
                }
            } finally {
                subscribeLock.readLock().unlock();
            }

            if (replyMessage == null)
                replyMessage = new AcknowledgeMessage();
        } else if (command.getOperation() == CommandMessage.POLL_OPERATION) {
            // This code path handles poll messages sent by Consumer.receive().
            // This API should not trigger server side waits, so we invoke poll
            // and if there are no queued messages for this Consumer instance we
            // return an empty acknowledgement immediately.
            MessageClient client = null;
            try {
                client = subscriptionManager.getMessageClient(clientId, endpointId);

                if (client != null) {
                    if (adapter.handlesSubscriptions()) {
                        List missedMessages = (List) adapter.manage(command);
                        if (missedMessages != null && !missedMessages.isEmpty()) {
                            MessageBroker broker = getMessageBroker();
                            for (Iterator iter = missedMessages.iterator(); iter.hasNext(); )
                                broker.routeMessageToMessageClient((Message) iter.next(), client);
                        }
                    }
                    FlushResult flushResult = client.getFlexClient().poll(client);
                    List messagesToReturn = (flushResult != null) ? flushResult.getMessages() : null;
                    if (messagesToReturn != null && !messagesToReturn.isEmpty()) {
                        replyMessage = new CommandMessage(CommandMessage.CLIENT_SYNC_OPERATION);
                        replyMessage.setBody(messagesToReturn.toArray());
                    } else {
                        replyMessage = new AcknowledgeMessage();
                    }
                    // Adaptive poll wait is never used in responses to Consumer.receive() calls.
                } else {
                    ServiceException se = new ServiceException();
                    se.setCode(NOT_SUBSCRIBED_CODE);
                    se.setMessage(NOT_SUBSCRIBED, new Object[]{destination.getId()});
                    throw se;
                }
            } finally {
                subscriptionManager.releaseMessageClient(client);
            }
        } else {
            ServiceException se = new ServiceException();
            se.setMessage(UNKNOWN_COMMAND, new Object[]{new Integer(command.getOperation())});
            throw se;
        }

        return replyMessage;
    }