protected void serviceRemoteCommand()

in activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java [725:915]


    protected void serviceRemoteCommand(Command command) {
        if (!disposed.get()) {
            try {
                if (command.isMessageDispatch()) {
                    safeWaitUntilStarted();
                    MessageDispatch md = (MessageDispatch) command;
                    serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
                    ackAdvisory(md.getMessage());
                } else if (command.isBrokerInfo()) {
                    futureRemoteBrokerInfo.set((BrokerInfo) command);
                } else if (command instanceof BrokerSubscriptionInfo) {
                    final BrokerSubscriptionInfo brokerSubscriptionInfo = (BrokerSubscriptionInfo) command;

                    // Skip the durable sync if any of the following are true:
                    // 1) if the flag is set to false.
                    // 2) If dynamicOnly is true, this means means to only activate when the real
                    //    consumers come back so we need to skip. This mode is useful espeically when
                    //    setting TTL > 1 as the TTL info is tied to consumers
                    // 3) If conduit subscriptions is disable we also skip, for the same reason we
                    //    skip when dynamicOnly is true, that we need to let consumers entirely drive
                    //    the creation/removal of subscriptions as each consumer gets their own
                    if (!configuration.isSyncDurableSubs() || !configuration.isConduitSubscriptions()
                        || configuration.isDynamicOnly()) {
                        return;
                    }

                    //Start in a new thread so we don't block the transport waiting for staticDestinations
                    syncExecutor.execute(() -> {
                        try {
                            staticDestinationsLatch.await();

                            //Make sure after the countDown of staticDestinationsLatch we aren't stopping
                            if (!disposed.get() && started.get()) {
                                final BrokerSubscriptionInfo subInfo = brokerSubscriptionInfo;
                                LOG.debug("Received Remote BrokerSubscriptionInfo on {} from {}",
                                    brokerService.getBrokerName(), subInfo.getBrokerName());

                                // Go through and subs sent and see if we can add demand
                                if (subInfo.getSubscriptionInfos() != null) {
                                    // Re-add and process subscriptions on the remote broker to add demand
                                    for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
                                        // Brokers filter what is sent, but the filtering logic has changed between
                                        // versions, plus some durables sent are only processed for removes so we
                                        // need to filter what to process for adding demand
                                        if (NetworkBridgeUtils.matchesConfigForDurableSync(configuration,
                                            info.getClientId(), info.getSubscriptionName(), info.getDestination())) {
                                            serviceRemoteConsumerAdvisory(info);
                                        }
                                    }
                                }

                                //After processing demand to add, clean up any empty durables
                                for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
                                    DemandSubscription ds = i.next();
                                    // This filters on destinations to see if we should process possible removal
                                    // based on the bridge configuration (included dests, TTL, etc).
                                    if (NetworkBridgeUtils.matchesDestinations(configuration.getDynamicallyIncludedDestinations(),
                                        ds.getLocalInfo().getDestination())) {
                                        // Note that this method will further check that there are no remote
                                        // demand that was previously added or associated. If there are remote
                                        // subscriptions tied to the DS, then it will not be removed.
                                        cleanupDurableSub(ds, i);
                                    }
                                }
                            }
                        } catch (Exception e) {
                            LOG.warn("Error processing BrokerSubscriptionInfo: {}", e.getMessage(), e);
                            LOG.debug(e.getMessage(), e);
                        }
                    });

                } else if (command.getClass() == ConnectionError.class) {
                    ConnectionError ce = (ConnectionError) command;
                    serviceRemoteException(ce.getException());
                } else {
                    if (isDuplex()) {
                        LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType());
                        if (command.isMessage()) {
                            final ActiveMQMessage message = (ActiveMQMessage) command;
                            if (NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
                                serviceRemoteConsumerAdvisory(message.getDataStructure());
                                ackAdvisory(message);
                            } else {
                                if (!isPermissableDestination(message.getDestination(), true)) {
                                    return;
                                }
                                safeWaitUntilStarted();
                                // message being forwarded - we need to
                                // propagate the response to our local send
                                if (canDuplexDispatch(message)) {
                                    message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
                                        duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
                                            final int correlationId = message.getCommandId();

                                            @Override
                                            public void onCompletion(FutureResponse resp) {
                                                try {
                                                    Response reply = resp.getResult();
                                                    reply.setCorrelationId(correlationId);
                                                    remoteBroker.oneway(reply);
                                                    //increment counter when messages are received in duplex mode
                                                    networkBridgeStatistics.getReceivedCount().increment();
                                                } catch (IOException error) {
                                                    LOG.error("Exception: {} on duplex forward of: {}", error, message);
                                                    serviceRemoteException(error);
                                                }
                                            }
                                        });
                                    } else {
                                        duplexInboundLocalBroker.oneway(message);
                                        networkBridgeStatistics.getReceivedCount().increment();
                                    }
                                    serviceInboundMessage(message);
                                } else {
                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
                                        Response reply = new Response();
                                        reply.setCorrelationId(message.getCommandId());
                                        remoteBroker.oneway(reply);
                                    }
                                }
                            }
                        } else {
                            switch (command.getDataStructureType()) {
                                case ConnectionInfo.DATA_STRUCTURE_TYPE:
                                    if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) {
                                        // end of initiating connection setup - propogate to initial connection to get mbean by clientid
                                        duplexInitiatingConnection.processAddConnection((ConnectionInfo) command);
                                    } else {
                                        localBroker.oneway(command);
                                    }
                                    break;
                                case SessionInfo.DATA_STRUCTURE_TYPE:
                                    localBroker.oneway(command);
                                    break;
                                case ProducerInfo.DATA_STRUCTURE_TYPE:
                                    // using duplexInboundLocalProducerInfo
                                    break;
                                case MessageAck.DATA_STRUCTURE_TYPE:
                                    MessageAck ack = (MessageAck) command;
                                    DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
                                    if (localSub != null) {
                                        ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
                                        localBroker.oneway(ack);
                                    } else {
                                        LOG.warn("Matching local subscription not found for ack: {}", ack);
                                    }
                                    break;
                                case ConsumerInfo.DATA_STRUCTURE_TYPE:
                                    localStartedLatch.await();
                                    if (started.get()) {
                                        final ConsumerInfo consumerInfo = (ConsumerInfo) command;
                                        if (isDuplicateSuppressionOff(consumerInfo)) {
                                            addConsumerInfo(consumerInfo);
                                        } else {
                                            synchronized (brokerService.getVmConnectorURI()) {
                                                addConsumerInfo(consumerInfo);
                                            }
                                        }
                                    } else {
                                        // received a subscription whilst stopping
                                        LOG.warn("Stopping - ignoring ConsumerInfo: {}", command);
                                    }
                                    break;
                                case ShutdownInfo.DATA_STRUCTURE_TYPE:
                                    // initiator is shutting down, controlled case
                                    // abortive close dealt with by inactivity monitor
                                    LOG.info("Stopping network bridge on shutdown of remote broker");
                                    serviceRemoteException(new IOException(command.toString()));
                                    break;
                                default:
                                    LOG.debug("Ignoring remote command: {}", command);
                            }
                        }
                    } else {
                        switch (command.getDataStructureType()) {
                            case KeepAliveInfo.DATA_STRUCTURE_TYPE:
                            case WireFormatInfo.DATA_STRUCTURE_TYPE:
                            case ShutdownInfo.DATA_STRUCTURE_TYPE:
                                break;
                            default:
                                LOG.warn("Unexpected remote command: {}", command);
                        }
                    }
                }
            } catch (Throwable e) {
                LOG.debug("Exception processing remote command: {}", command, e);
                serviceRemoteException(e);
            }
        }
    }