protected void onStompSubscribe()

in activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java [563:681]


    protected void onStompSubscribe(StompFrame command) throws ProtocolException {
        checkConnected();
        FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION));
        Map<String, String> headers = command.getHeaders();

        String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
        String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);

        if (!this.version.equals(Stomp.V1_0) && subscriptionId == null) {
            throw new ProtocolException("SUBSCRIBE received without a subscription id!");
        }

        if (destination == null || "".equals(destination)) {
            throw new ProtocolException("Invalid empty or 'null' Destination header");
        }

        final ActiveMQDestination actualDest = translator.convertDestination(this, destination, true);

        if (actualDest == null) {
            throw new ProtocolException("Invalid 'null' Destination.");
        }

        final ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
        ConsumerInfo consumerInfo = new ConsumerInfo(id);
        consumerInfo.setPrefetchSize(actualDest.isQueue() ?
                ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH :
                headers.containsKey("activemq.subscriptionName") ?
                        ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH : ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH);
        consumerInfo.setDispatchAsync(true);

        String browser = headers.get(Stomp.Headers.Subscribe.BROWSER);
        if (browser != null && browser.equals(Stomp.TRUE)) {

            if (this.version.equals(Stomp.V1_0)) {
                throw new ProtocolException("Queue Browser feature only valid for Stomp v1.1+ clients!");
            }

            consumerInfo.setBrowser(true);
            consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH);
        }

        String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR);
        if (selector != null) {
            consumerInfo.setSelector("convert_string_expressions:" + selector);
        }

        IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");

        if (actualDest.isQueue() && consumerInfo.getSubscriptionName() != null) {
            throw new ProtocolException("Invalid Subscription: cannot durably subscribe to a Queue destination!");
        }

        consumerInfo.setDestination(actualDest);
        consumerInfo.setDispatchAsync(true);

        StompSubscription stompSubscription;
        if (!consumerInfo.isBrowser()) {
            stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION), pendingAcksTracker);
        } else {
            stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION), pendingAcksTracker);
        }
        stompSubscription.setDestination(actualDest);

        String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
        if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
            stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
        } else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) {
            stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK);
        } else {
            stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
        }

        subscriptionsByConsumerId.put(id, stompSubscription);
        // Stomp v1.0 doesn't need to set this header so we avoid an NPE if not set.
        if (subscriptionId != null) {
            subscriptions.put(subscriptionId, stompSubscription);
        }

        final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
        if (receiptId != null && consumerInfo.getPrefetchSize() > 0) {

            final StompFrame cmd = command;
            final int prefetch = consumerInfo.getPrefetchSize();

            // Since dispatch could beat the receipt we set prefetch to zero to start and then
            // once we've sent our Receipt we are safe to turn on dispatch if the response isn't
            // an error message.
            consumerInfo.setPrefetchSize(0);

            final ResponseHandler handler = new ResponseHandler() {
                @Override
                public void onResponse(ProtocolConverter converter, Response response) throws IOException {
                    if (response.isException()) {
                        // Generally a command can fail.. but that does not invalidate the connection.
                        // We report back the failure but we don't close the connection.
                        Throwable exception = ((ExceptionResponse)response).getException();
                        handleException(exception, cmd);
                    } else {
                        StompFrame sc = new StompFrame();
                        sc.setAction(Stomp.Responses.RECEIPT);
                        sc.setHeaders(new HashMap<String, String>(1));
                        sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
                        stompTransport.sendToStomp(sc);

                        ConsumerControl control = new ConsumerControl();
                        control.setPrefetch(prefetch);
                        control.setDestination(actualDest);
                        control.setConsumerId(id);

                        sendToActiveMQ(control, null);
                    }
                }
            };

            sendToActiveMQ(consumerInfo, handler);
        } else {
            sendToActiveMQ(consumerInfo, createResponseHandler(command));
        }
    }