private void serviceRemoteConsumerAdvisory()

in activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java [937:1082]


    private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
        final int networkTTL = configuration.getConsumerTTL();
        if (data.getClass() == ConsumerInfo.class) {
            // Create a new local subscription
            ConsumerInfo info = (ConsumerInfo) data;
            BrokerId[] path = info.getBrokerPath();

            if (info.isBrowser()) {
                LOG.debug("{} Ignoring sub from {}, browsers explicitly suppressed", configuration.getBrokerName(), remoteBrokerName);
                return;
            }

            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
                LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}",
                        configuration.getBrokerName(), remoteBrokerName, networkTTL, info);
                return;
            }

            if (contains(path, localBrokerPath[0])) {
                // Ignore this consumer as it's a consumer we locally sent to the broker.
                LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}",
                        configuration.getBrokerName(), remoteBrokerName, info);
                return;
            }

            if (!isPermissableDestination(info.getDestination())) {
                // ignore if not in the permitted or in the excluded list
                LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}",
                        configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info);
                return;
            }

            // in a cyclic network there can be multiple bridges per broker that can propagate
            // a network subscription so there is a need to synchronize on a shared entity
            // if duplicate suppression is required
            if (isDuplicateSuppressionOff(info)) {
                addConsumerInfo(info);
            } else {
                synchronized (brokerService.getVmConnectorURI()) {
                    addConsumerInfo(info);
                }
            }
        } else if (data.getClass() == DestinationInfo.class) {
            final DestinationInfo destInfo = (DestinationInfo) data;
            BrokerId[] path = destInfo.getBrokerPath();
            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
                LOG.debug("{} Ignoring destination {} restricted to {} network hops only",
                        configuration.getBrokerName(), destInfo, networkTTL);
                return;
            }
            if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
                LOG.debug("{} Ignoring destination {} already routed through this broker once", configuration.getBrokerName(), destInfo);
                return;
            }
            destInfo.setConnectionId(localConnectionInfo.getConnectionId());
            if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
                // re-set connection id so comes from here
                ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
                tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
            }
            destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
            LOG.trace("{} bridging {} destination on {} from {}, destination: {}",
                    configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo);
            if (destInfo.isRemoveOperation()) {
                // not synced with addSubs so we will need to ignore any potential new subs with a timeout!=0
                destInfo.setTimeout(1);
            }
            // Serialize both add/remove dest with removeSub operations such that all removeSub advisories are generated
            serialExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        localBroker.oneway(destInfo);
                    } catch (IOException e) {
                        LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e);
                    }
                }
            });

        } else if (data.getClass() == RemoveInfo.class) {
            ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();

            // If we have an entry in compositeConsumerIds then this consumer was a
            // composite consumer and we need to remove the entries in the set and
            // not the consumer id we received here
            final Set<ConsumerId> compositeIds = compositeConsumerIds.remove(id);
            if (compositeIds != null) {
                for (ConsumerId compositeId : compositeIds) {
                    serviceRemoteConsumerAdvisory(new RemoveInfo(compositeId));
                }
                return;
            }

            removeDemandSubscription(id);

            if (forcedDurableRemoteId.remove(id)) {
                for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
                    DemandSubscription ds = i.next();
                    boolean removed = ds.removeForcedDurableConsumer(id);
                    if (removed) {
                        cleanupDurableSub(ds, i);
                    }
                }
           }

        } else if (data.getClass() == RemoveSubscriptionInfo.class) {
            final RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
            final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());

            // If we have an entry in compositeSubscriptions then this consumer was a
            // composite consumer and we need to remove the entries in the set and not
            // the subscription that we received here
            final Set<SubscriptionInfo> compositeSubs =
                this.compositeSubscriptions.remove(subscriptionInfo);
            if (compositeSubs != null) {
                for (SubscriptionInfo compositeSub : compositeSubs) {
                    RemoveSubscriptionInfo remove = new RemoveSubscriptionInfo();
                    remove.setClientId(compositeSub.getClientId());
                    remove.setSubscriptionName(compositeSub.getSubscriptionName());
                    remove.setConnectionId(this.localConnectionInfo.getConnectionId());
                    serviceRemoteConsumerAdvisory(remove);
                }
                return;
            }

            final boolean proxyBridgeSub = isProxyBridgeSubscription(subscriptionInfo.getClientId(),
                    subscriptionInfo.getSubscriptionName());
            for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
                DemandSubscription ds = i.next();
                boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);

                //If this is a proxy bridge subscription we need to try changing the clientId
                if (!removed && proxyBridgeSub){
                    subscriptionInfo.setClientId(getProxyBridgeClientId(subscriptionInfo.getClientId()));
                    if (ds.getDurableRemoteSubs().contains(subscriptionInfo)) {
                        ds.getDurableRemoteSubs().remove(subscriptionInfo);
                        removed = true;
                    }
                }

                if (removed) {
                    cleanupDurableSub(ds, i);
                }
            }
        }
    }