in broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java [822:925]
protected void detach(Error error, final boolean close)
{
if (_consumerTarget != null)
{
_consumerTarget.close();
}
Source source = getSource();
TerminusExpiryPolicy expiryPolicy = source.getExpiryPolicy();
NamedAddressSpace addressSpace = getSession().getConnection().getAddressSpace();
List<Symbol> sourceCapabilities = source.getCapabilities() == null ? Collections.emptyList() : Arrays.asList(source.getCapabilities());
if (close
|| TerminusExpiryPolicy.LINK_DETACH.equals(expiryPolicy)
|| ((expiryPolicy == null || TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy)) && getSession().isClosing())
|| (TerminusExpiryPolicy.CONNECTION_CLOSE.equals(expiryPolicy) && getSession().getConnection().isClosing()))
{
cleanUpUnsettledDeliveries();
}
if (close)
{
Error closingError = null;
if (getDestination() instanceof ExchangeSendingDestination
&& addressSpace instanceof QueueManagingVirtualHost && TerminusExpiryPolicy.NEVER.equals(expiryPolicy))
{
try
{
((QueueManagingVirtualHost) addressSpace).removeSubscriptionQueue(
((ExchangeSendingDestination) getDestination()).getQueue().getName());
TerminusDurability sourceDurability = source.getDurable();
if (sourceDurability != null
&& !TerminusDurability.NONE.equals(sourceDurability)
&& sourceCapabilities.contains(Session_1_0.SHARED_CAPABILITY)
&& sourceCapabilities.contains(ExchangeSendingDestination.TOPIC_CAPABILITY))
{
final Pattern containerIdPattern = sourceCapabilities.contains(Session_1_0.GLOBAL_CAPABILITY)
? ANY_CONTAINER_ID
: Pattern.compile("^" + Pattern.quote(getSession().getConnection().getRemoteContainerId()) + "$");
final Pattern linkNamePattern = Pattern.compile("^" + Pattern.quote(getLinkName()) + "\\|?\\d*$");
addressSpace.visitSendingLinks((LinkRegistryModel.LinkVisitor<Link_1_0<Source, Target>>) link -> {
if (containerIdPattern.matcher(link.getRemoteContainerId()).matches()
&& linkNamePattern.matcher(link.getName()).matches())
{
link.linkClosed();
}
return false;
});
}
}
catch (AccessControlException e)
{
LOGGER.error("Error unregistering subscription", e);
closingError = new Error(AmqpError.NOT_ALLOWED, "Error unregistering subscription");
}
catch (IllegalStateException e)
{
String message;
if(sourceCapabilities.contains(Session_1_0.SHARED_CAPABILITY)
&& sourceCapabilities.contains(ExchangeSendingDestination.TOPIC_CAPABILITY))
{
String subscriptionName = getLinkName();
int separator = subscriptionName.indexOf("|");
if (separator > 0)
{
subscriptionName = subscriptionName.substring(0, separator);
}
message = "There are active consumers on the shared subscription '"+subscriptionName+"'";
}
else
{
message = e.getMessage();
}
closingError = new Error(AmqpError.RESOURCE_LOCKED, message);
}
catch (NotFoundException e)
{
closingError = new Error(AmqpError.NOT_FOUND, e.getMessage());
}
}
if (error == null)
{
error = closingError;
}
else
{
LOGGER.warn("Unexpected error on detaching endpoint {}: {}", getLinkName(), error);
}
}
else if (addressSpace instanceof QueueManagingVirtualHost
&& ((QueueManagingVirtualHost) addressSpace).isDiscardGlobalSharedSubscriptionLinksOnDetach()
&& sourceCapabilities.contains(Session_1_0.SHARED_CAPABILITY)
&& sourceCapabilities.contains(Session_1_0.GLOBAL_CAPABILITY)
&& sourceCapabilities.contains(ExchangeSendingDestination.TOPIC_CAPABILITY)
&& !getLinkName().endsWith("|global"))
{
// For JMS 2.0 global shared subscriptions we do not want to keep the links hanging around.
// However, we keep one link (ending with "|global") to perform a null-source lookup upon un-subscription.
getLink().linkClosed();
}
super.detach(error, close);
}