in activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java [563:681]
protected void onStompSubscribe(StompFrame command) throws ProtocolException {
checkConnected();
FrameTranslator translator = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION));
Map<String, String> headers = command.getHeaders();
String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
if (!this.version.equals(Stomp.V1_0) && subscriptionId == null) {
throw new ProtocolException("SUBSCRIBE received without a subscription id!");
}
if (destination == null || "".equals(destination)) {
throw new ProtocolException("Invalid empty or 'null' Destination header");
}
final ActiveMQDestination actualDest = translator.convertDestination(this, destination, true);
if (actualDest == null) {
throw new ProtocolException("Invalid 'null' Destination.");
}
final ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerInfo.setPrefetchSize(actualDest.isQueue() ?
ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH :
headers.containsKey("activemq.subscriptionName") ?
ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH : ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH);
consumerInfo.setDispatchAsync(true);
String browser = headers.get(Stomp.Headers.Subscribe.BROWSER);
if (browser != null && browser.equals(Stomp.TRUE)) {
if (this.version.equals(Stomp.V1_0)) {
throw new ProtocolException("Queue Browser feature only valid for Stomp v1.1+ clients!");
}
consumerInfo.setBrowser(true);
consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH);
}
String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR);
if (selector != null) {
consumerInfo.setSelector("convert_string_expressions:" + selector);
}
IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
if (actualDest.isQueue() && consumerInfo.getSubscriptionName() != null) {
throw new ProtocolException("Invalid Subscription: cannot durably subscribe to a Queue destination!");
}
consumerInfo.setDestination(actualDest);
consumerInfo.setDispatchAsync(true);
StompSubscription stompSubscription;
if (!consumerInfo.isBrowser()) {
stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION), pendingAcksTracker);
} else {
stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION), pendingAcksTracker);
}
stompSubscription.setDestination(actualDest);
String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
if (Stomp.Headers.Subscribe.AckModeValues.CLIENT.equals(ackMode)) {
stompSubscription.setAckMode(StompSubscription.CLIENT_ACK);
} else if (Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL.equals(ackMode)) {
stompSubscription.setAckMode(StompSubscription.INDIVIDUAL_ACK);
} else {
stompSubscription.setAckMode(StompSubscription.AUTO_ACK);
}
subscriptionsByConsumerId.put(id, stompSubscription);
// Stomp v1.0 doesn't need to set this header so we avoid an NPE if not set.
if (subscriptionId != null) {
subscriptions.put(subscriptionId, stompSubscription);
}
final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
if (receiptId != null && consumerInfo.getPrefetchSize() > 0) {
final StompFrame cmd = command;
final int prefetch = consumerInfo.getPrefetchSize();
// Since dispatch could beat the receipt we set prefetch to zero to start and then
// once we've sent our Receipt we are safe to turn on dispatch if the response isn't
// an error message.
consumerInfo.setPrefetchSize(0);
final ResponseHandler handler = new ResponseHandler() {
@Override
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
// Generally a command can fail.. but that does not invalidate the connection.
// We report back the failure but we don't close the connection.
Throwable exception = ((ExceptionResponse)response).getException();
handleException(exception, cmd);
} else {
StompFrame sc = new StompFrame();
sc.setAction(Stomp.Responses.RECEIPT);
sc.setHeaders(new HashMap<String, String>(1));
sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
stompTransport.sendToStomp(sc);
ConsumerControl control = new ConsumerControl();
control.setPrefetch(prefetch);
control.setDestination(actualDest);
control.setConsumerId(id);
sendToActiveMQ(control, null);
}
}
};
sendToActiveMQ(consumerInfo, handler);
} else {
sendToActiveMQ(consumerInfo, createResponseHandler(command));
}
}