in activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java [63:130]
protected void setupStaticDestinations() {
super.setupStaticDestinations();
ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations;
if (dests != null) {
for (ActiveMQDestination dest : dests) {
if (isPermissableDestination(dest) && !doesConsumerExist(dest)) {
try {
//Filtering by non-empty subscriptions, see AMQ-5875
if (dest.isTopic()) {
RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
String candidateSubName = getSubscriberName(dest);
for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) {
ConsumerInfo subInfo = subscription.getConsumerInfo();
String subName = subInfo.getSubscriptionName();
String clientId = subscription.getContext().getClientId();
if (subName != null && subName.equals(candidateSubName) && clientId.startsWith(configuration.getName())) {
// Include the brokerPath if it exists so that we can handle TTL more correctly
// This only works if the consumers are online as offline consumers are missing TTL
// For TTL > 1 configurations setting dynamicOnly to true may make more sense
DemandSubscription sub = createDemandSubscription(dest, subName, subInfo.getBrokerPath());
if (sub != null) {
sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
sub.setStaticallyIncluded(true);
addSubscription(sub);
break;
}
}
}
}
} catch (IOException e) {
LOG.error("Failed to add static destination {}", dest, e);
}
LOG.trace("Forwarding messages for durable destination: {}", dest);
} else if (configuration.isSyncDurableSubs() && !isPermissableDestination(dest)) {
if (dest.isTopic()) {
RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
String candidateSubName = getSubscriberName(dest);
for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) {
String subName = subscription.getConsumerInfo().getSubscriptionName();
if (subName != null && subName.equals(candidateSubName) &&
subscription instanceof DurableTopicSubscription) {
try {
DurableTopicSubscription durableSub = (DurableTopicSubscription) subscription;
//check the clientId so we only remove subs for the matching bridge
if (durableSub.getSubscriptionKey().getClientId().equals(localClientId)) {
// remove the NC subscription as it is no longer for a permissible dest
RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
sending.setClientId(localClientId);
sending.setSubscriptionName(subName);
sending.setConnectionId(this.localConnectionInfo.getConnectionId());
localBroker.oneway(sending);
}
} catch (IOException e) {
LOG.debug("Exception removing NC durable subscription: {}", subName, e);
serviceRemoteException(e);
}
break;
}
}
}
}
}
}
}