protected void setupStaticDestinations()

in activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java [63:130]


    protected void setupStaticDestinations() {
        super.setupStaticDestinations();
        ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations;
        if (dests != null) {
            for (ActiveMQDestination dest : dests) {
                if (isPermissableDestination(dest) && !doesConsumerExist(dest)) {
                    try {
                        //Filtering by non-empty subscriptions, see AMQ-5875
                        if (dest.isTopic()) {
                            RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
                            TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();

                            String candidateSubName = getSubscriberName(dest);
                            for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) {
                                ConsumerInfo subInfo = subscription.getConsumerInfo();
                                String subName = subInfo.getSubscriptionName();
                                String clientId = subscription.getContext().getClientId();
                                if (subName != null && subName.equals(candidateSubName) && clientId.startsWith(configuration.getName())) {
                                    // Include the brokerPath if it exists so that we can handle TTL more correctly
                                    // This only works if the consumers are online as offline consumers are missing TTL
                                    // For TTL > 1 configurations setting dynamicOnly to true may make more sense
                                    DemandSubscription sub = createDemandSubscription(dest, subName, subInfo.getBrokerPath());
                                    if (sub != null) {
                                        sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
                                        sub.setStaticallyIncluded(true);
                                        addSubscription(sub);
                                        break;
                                    }
                                }
                            }
                        }
                    } catch (IOException e) {
                        LOG.error("Failed to add static destination {}", dest, e);
                    }
                    LOG.trace("Forwarding messages for durable destination: {}", dest);
                } else if (configuration.isSyncDurableSubs() && !isPermissableDestination(dest)) {
                    if (dest.isTopic()) {
                        RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
                        TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();

                        String candidateSubName = getSubscriberName(dest);
                        for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) {
                            String subName = subscription.getConsumerInfo().getSubscriptionName();
                            if (subName != null && subName.equals(candidateSubName) &&
                                    subscription instanceof DurableTopicSubscription) {
                               try {
                                    DurableTopicSubscription durableSub = (DurableTopicSubscription) subscription;
                                    //check the clientId so we only remove subs for the matching bridge
                                    if (durableSub.getSubscriptionKey().getClientId().equals(localClientId)) {
                                        // remove the NC subscription as it is no longer for a permissible dest
                                        RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
                                        sending.setClientId(localClientId);
                                        sending.setSubscriptionName(subName);
                                        sending.setConnectionId(this.localConnectionInfo.getConnectionId());
                                        localBroker.oneway(sending);
                                    }
                                } catch (IOException e) {
                                    LOG.debug("Exception removing NC durable subscription: {}", subName, e);
                                    serviceRemoteException(e);
                                }
                                break;
                            }
                        }
                    }
                }
            }
        }
    }