protected void detach()

in broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java [822:925]


    protected void detach(Error error, final boolean close)
    {
        if (_consumerTarget != null)
        {
            _consumerTarget.close();
        }

        Source source = getSource();
        TerminusExpiryPolicy expiryPolicy = source.getExpiryPolicy();
        NamedAddressSpace addressSpace = getSession().getConnection().getAddressSpace();
        List<Symbol> sourceCapabilities = source.getCapabilities() == null ? Collections.emptyList() : Arrays.asList(source.getCapabilities());

        if (close
            || TerminusExpiryPolicy.LINK_DETACH.equals(expiryPolicy)
            || ((expiryPolicy == null || TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy)) && getSession().isClosing())
            || (TerminusExpiryPolicy.CONNECTION_CLOSE.equals(expiryPolicy) && getSession().getConnection().isClosing()))
        {
            cleanUpUnsettledDeliveries();
        }

        if (close)
        {
            Error closingError = null;
            if (getDestination() instanceof ExchangeSendingDestination
                && addressSpace instanceof QueueManagingVirtualHost && TerminusExpiryPolicy.NEVER.equals(expiryPolicy))
            {
                try
                {
                    ((QueueManagingVirtualHost) addressSpace).removeSubscriptionQueue(
                            ((ExchangeSendingDestination) getDestination()).getQueue().getName());

                    TerminusDurability sourceDurability = source.getDurable();
                    if (sourceDurability != null
                        && !TerminusDurability.NONE.equals(sourceDurability)
                        && sourceCapabilities.contains(Session_1_0.SHARED_CAPABILITY)
                        && sourceCapabilities.contains(ExchangeSendingDestination.TOPIC_CAPABILITY))
                    {
                        final Pattern containerIdPattern = sourceCapabilities.contains(Session_1_0.GLOBAL_CAPABILITY)
                                ? ANY_CONTAINER_ID
                                : Pattern.compile("^" + Pattern.quote(getSession().getConnection().getRemoteContainerId()) + "$");
                        final Pattern linkNamePattern = Pattern.compile("^" + Pattern.quote(getLinkName()) + "\\|?\\d*$");

                        addressSpace.visitSendingLinks((LinkRegistryModel.LinkVisitor<Link_1_0<Source, Target>>) link -> {
                            if (containerIdPattern.matcher(link.getRemoteContainerId()).matches()
                                && linkNamePattern.matcher(link.getName()).matches())
                            {
                                link.linkClosed();
                            }
                            return false;
                        });
                    }
                }
                catch (AccessControlException e)
                {
                    LOGGER.error("Error unregistering subscription", e);
                    closingError = new Error(AmqpError.NOT_ALLOWED, "Error unregistering subscription");
                }
                catch (IllegalStateException e)
                {
                    String message;
                    if(sourceCapabilities.contains(Session_1_0.SHARED_CAPABILITY)
                       && sourceCapabilities.contains(ExchangeSendingDestination.TOPIC_CAPABILITY))
                    {
                        String subscriptionName = getLinkName();
                        int separator = subscriptionName.indexOf("|");
                        if (separator > 0)
                        {
                            subscriptionName = subscriptionName.substring(0, separator);
                        }
                        message = "There are active consumers on the shared subscription '"+subscriptionName+"'";
                    }
                    else
                    {
                        message = e.getMessage();
                    }
                    closingError = new Error(AmqpError.RESOURCE_LOCKED, message);
                }
                catch (NotFoundException e)
                {
                    closingError = new Error(AmqpError.NOT_FOUND, e.getMessage());
                }
            }
            if (error == null)
            {
                error = closingError;
            }
            else
            {
                LOGGER.warn("Unexpected error on detaching endpoint {}: {}", getLinkName(), error);
            }
        }
        else if (addressSpace instanceof QueueManagingVirtualHost
                 && ((QueueManagingVirtualHost) addressSpace).isDiscardGlobalSharedSubscriptionLinksOnDetach()
                 && sourceCapabilities.contains(Session_1_0.SHARED_CAPABILITY)
                 && sourceCapabilities.contains(Session_1_0.GLOBAL_CAPABILITY)
                 && sourceCapabilities.contains(ExchangeSendingDestination.TOPIC_CAPABILITY)
                 && !getLinkName().endsWith("|global"))
        {
            // For JMS 2.0 global shared subscriptions we do not want to keep the links hanging around.
            // However, we keep one link (ending with "|global") to perform a null-source lookup upon un-subscription.
            getLink().linkClosed();
        }
        super.detach(error, close);
    }