in qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java [74:134]
protected Receiver createEndpoint(JmsConsumerInfo resourceInfo) {
JmsDestination destination = resourceInfo.getDestination();
String address = AmqpDestinationHelper.getDestinationAddress(destination, getParent().getConnection());
Source source = new Source();
source.setAddress(address);
Target target = new Target();
configureSource(source);
String receiverLinkName = null;
String subscriptionName = resourceInfo.getSubscriptionName();
if (subscriptionName != null && !subscriptionName.isEmpty()) {
AmqpConnection connection = getParent().getConnection();
if (resourceInfo.isShared() && !connection.getProperties().isSharedSubsSupported()) {
validateSharedSubsLinkCapability = true;
}
AmqpSubscriptionTracker subTracker = connection.getSubTracker();
// Validate subscriber type allowed given existing active subscriber types.
if (resourceInfo.isShared() && resourceInfo.isDurable()) {
if(subTracker.isActiveExclusiveDurableSub(subscriptionName)) {
// Don't allow shared sub if there is already an active exclusive durable sub
throw new JMSRuntimeException("A non-shared durable subscription is already active with name '" + subscriptionName + "'");
}
} else if (!resourceInfo.isShared() && resourceInfo.isDurable()) {
if (subTracker.isActiveExclusiveDurableSub(subscriptionName)) {
// Exclusive durable sub is already active
throw new JMSRuntimeException("A non-shared durable subscription is already active with name '" + subscriptionName + "'");
} else if (subTracker.isActiveSharedDurableSub(subscriptionName)) {
// Don't allow exclusive durable sub if there is already an active shared durable sub
throw new JMSRuntimeException("A shared durable subscription is already active with name '" + subscriptionName + "'");
}
}
// Get the link name for the subscription. Throws if certain further validations fail.
receiverLinkName = subTracker.reserveNextSubscriptionLinkName(subscriptionName, resourceInfo);
}
if (receiverLinkName == null) {
receiverLinkName = "qpid-jms:receiver:" + resourceInfo.getId() + ":" + address;
}
Receiver receiver = getParent().getEndpoint().receiver(receiverLinkName);
receiver.setSource(source);
receiver.setTarget(target);
if (resourceInfo.isBrowser() || resourceInfo.isPresettle()) {
receiver.setSenderSettleMode(SenderSettleMode.SETTLED);
} else {
receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
}
receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
if (validateSharedSubsLinkCapability) {
receiver.setDesiredCapabilities(new Symbol[] { AmqpSupport.SHARED_SUBS });
}
return receiver;
}