in activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java [258:412]
public void createSender(final Sender protonSender) throws Exception {
org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) protonSender.getRemoteSource();
ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
final AmqpSender sender = new AmqpSender(this, protonSender, consumerInfo);
LOG.debug("opening new sender {} on link: {}", consumerInfo.getConsumerId(), protonSender.getName());
try {
final Map<Symbol, Object> supportedFilters = new HashMap<>();
protonSender.setContext(sender);
boolean noLocal = false;
String selector = null;
if (source != null) {
Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS);
if (filter != null) {
selector = filter.getValue().getDescribed().toString();
// Validate the Selector.
try {
SelectorParser.parse(selector);
} catch (InvalidSelectorException e) {
sender.close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
return;
}
supportedFilters.put(filter.getKey(), filter.getValue());
}
filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS);
if (filter != null) {
noLocal = true;
supportedFilters.put(filter.getKey(), filter.getValue());
}
}
ActiveMQDestination destination;
if (source == null) {
// Attempt to recover previous subscription
ConsumerInfo storedInfo = connection.lookupSubscription(protonSender.getName());
if (storedInfo != null) {
destination = storedInfo.getDestination();
source = new org.apache.qpid.proton.amqp.messaging.Source();
source.setAddress(destination.getQualifiedName());
source.setDurable(TerminusDurability.UNSETTLED_STATE);
source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
source.setDistributionMode(COPY);
if (storedInfo.isNoLocal()) {
supportedFilters.put(NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL);
}
if (storedInfo.getSelector() != null && !storedInfo.getSelector().trim().equals("")) {
supportedFilters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(storedInfo.getSelector()));
}
} else {
sender.close(new ErrorCondition(AmqpError.NOT_FOUND, "Unknown subscription link: " + protonSender.getName()));
return;
}
} else if (source.getDynamic()) {
destination = connection.createTemporaryDestination(protonSender, source.getCapabilities());
Map<Symbol, Object> dynamicNodeProperties = new HashMap<>();
dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance());
// Currently we only support temporary destinations with delete on close lifetime policy.
source = new org.apache.qpid.proton.amqp.messaging.Source();
source.setAddress(destination.getQualifiedName());
source.setCapabilities(AmqpSupport.getDestinationTypeSymbol(destination));
source.setDynamic(true);
source.setDynamicNodeProperties(dynamicNodeProperties);
sender.addCloseAction(new Runnable() {
@Override
public void run() {
connection.deleteTemporaryDestination((ActiveMQTempDestination) sender.getDestination());
}
});
} else {
destination = createDestination(source);
if (destination.isTemporary()) {
String connectionId = ((ActiveMQTempDestination) destination).getConnectionId();
if (connectionId == null) {
throw new AmqpProtocolException(AmqpError.INVALID_FIELD.toString(), "Not a broker created temp destination");
}
}
}
source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);
protonSender.setSource(source);
int senderCredit = protonSender.getRemoteCredit();
// Allows the options on the destination to configure the consumerInfo
if (destination.getOptions() != null) {
Map<String, Object> options = IntrospectionSupport.extractProperties(
new HashMap<String, Object>(destination.getOptions()), "consumer.");
IntrospectionSupport.setProperties(consumerInfo, options);
if (options.size() > 0) {
String msg = "There are " + options.size()
+ " consumer options that couldn't be set on the consumer."
+ " Check the options are spelled correctly."
+ " Unknown parameters=[" + options + "]."
+ " This consumer cannot be started.";
LOG.warn(msg);
throw new AmqpProtocolException(AmqpError.INVALID_FIELD.toString(), msg);
}
}
consumerInfo.setSelector(selector);
consumerInfo.setNoRangeAcks(true);
consumerInfo.setDestination(destination);
consumerInfo.setPrefetchSize(senderCredit >= 0 ? senderCredit : 0);
consumerInfo.setDispatchAsync(true);
consumerInfo.setNoLocal(noLocal);
if (source.getDistributionMode() == COPY && destination.isQueue()) {
consumerInfo.setBrowser(true);
}
if ((TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) ||
TerminusDurability.CONFIGURATION.equals(source.getDurable())) && destination.isTopic()) {
consumerInfo.setSubscriptionName(protonSender.getName());
}
connection.sendToActiveMQ(consumerInfo, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
ErrorCondition error = null;
Throwable exception = ((ExceptionResponse) response).getException();
if (exception instanceof SecurityException) {
error = new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage());
} else if (exception instanceof InvalidSelectorException) {
error = new ErrorCondition(AmqpError.INVALID_FIELD, exception.getMessage());
} else {
error = new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage());
}
sender.close(error);
} else {
sender.open();
}
pumpProtonToSocket();
}
});
} catch (AmqpProtocolException e) {
sender.close(new ErrorCondition(Symbol.getSymbol(e.getSymbolicName()), e.getMessage()));
}
}