in activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java [937:1082]
private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
final int networkTTL = configuration.getConsumerTTL();
if (data.getClass() == ConsumerInfo.class) {
// Create a new local subscription
ConsumerInfo info = (ConsumerInfo) data;
BrokerId[] path = info.getBrokerPath();
if (info.isBrowser()) {
LOG.debug("{} Ignoring sub from {}, browsers explicitly suppressed", configuration.getBrokerName(), remoteBrokerName);
return;
}
if (path != null && networkTTL > -1 && path.length >= networkTTL) {
LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}",
configuration.getBrokerName(), remoteBrokerName, networkTTL, info);
return;
}
if (contains(path, localBrokerPath[0])) {
// Ignore this consumer as it's a consumer we locally sent to the broker.
LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}",
configuration.getBrokerName(), remoteBrokerName, info);
return;
}
if (!isPermissableDestination(info.getDestination())) {
// ignore if not in the permitted or in the excluded list
LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}",
configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info);
return;
}
// in a cyclic network there can be multiple bridges per broker that can propagate
// a network subscription so there is a need to synchronize on a shared entity
// if duplicate suppression is required
if (isDuplicateSuppressionOff(info)) {
addConsumerInfo(info);
} else {
synchronized (brokerService.getVmConnectorURI()) {
addConsumerInfo(info);
}
}
} else if (data.getClass() == DestinationInfo.class) {
final DestinationInfo destInfo = (DestinationInfo) data;
BrokerId[] path = destInfo.getBrokerPath();
if (path != null && networkTTL > -1 && path.length >= networkTTL) {
LOG.debug("{} Ignoring destination {} restricted to {} network hops only",
configuration.getBrokerName(), destInfo, networkTTL);
return;
}
if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
LOG.debug("{} Ignoring destination {} already routed through this broker once", configuration.getBrokerName(), destInfo);
return;
}
destInfo.setConnectionId(localConnectionInfo.getConnectionId());
if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
// re-set connection id so comes from here
ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
}
destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
LOG.trace("{} bridging {} destination on {} from {}, destination: {}",
configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo);
if (destInfo.isRemoveOperation()) {
// not synced with addSubs so we will need to ignore any potential new subs with a timeout!=0
destInfo.setTimeout(1);
}
// Serialize both add/remove dest with removeSub operations such that all removeSub advisories are generated
serialExecutor.execute(new Runnable() {
@Override
public void run() {
try {
localBroker.oneway(destInfo);
} catch (IOException e) {
LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e);
}
}
});
} else if (data.getClass() == RemoveInfo.class) {
ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
// If we have an entry in compositeConsumerIds then this consumer was a
// composite consumer and we need to remove the entries in the set and
// not the consumer id we received here
final Set<ConsumerId> compositeIds = compositeConsumerIds.remove(id);
if (compositeIds != null) {
for (ConsumerId compositeId : compositeIds) {
serviceRemoteConsumerAdvisory(new RemoveInfo(compositeId));
}
return;
}
removeDemandSubscription(id);
if (forcedDurableRemoteId.remove(id)) {
for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
DemandSubscription ds = i.next();
boolean removed = ds.removeForcedDurableConsumer(id);
if (removed) {
cleanupDurableSub(ds, i);
}
}
}
} else if (data.getClass() == RemoveSubscriptionInfo.class) {
final RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
// If we have an entry in compositeSubscriptions then this consumer was a
// composite consumer and we need to remove the entries in the set and not
// the subscription that we received here
final Set<SubscriptionInfo> compositeSubs =
this.compositeSubscriptions.remove(subscriptionInfo);
if (compositeSubs != null) {
for (SubscriptionInfo compositeSub : compositeSubs) {
RemoveSubscriptionInfo remove = new RemoveSubscriptionInfo();
remove.setClientId(compositeSub.getClientId());
remove.setSubscriptionName(compositeSub.getSubscriptionName());
remove.setConnectionId(this.localConnectionInfo.getConnectionId());
serviceRemoteConsumerAdvisory(remove);
}
return;
}
final boolean proxyBridgeSub = isProxyBridgeSubscription(subscriptionInfo.getClientId(),
subscriptionInfo.getSubscriptionName());
for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
DemandSubscription ds = i.next();
boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
//If this is a proxy bridge subscription we need to try changing the clientId
if (!removed && proxyBridgeSub){
subscriptionInfo.setClientId(getProxyBridgeClientId(subscriptionInfo.getClientId()));
if (ds.getDurableRemoteSubs().contains(subscriptionInfo)) {
ds.getDurableRemoteSubs().remove(subscriptionInfo);
removed = true;
}
}
if (removed) {
cleanupDurableSub(ds, i);
}
}
}
}