protected void serviceLocalCommand()

in activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java [1198:1300]


    protected void serviceLocalCommand(Command command) {
        if (!disposed.get()) {
            try {
                if (command.isMessageDispatch()) {
                    safeWaitUntilStarted();
                    networkBridgeStatistics.getEnqueues().increment();
                    final MessageDispatch md = (MessageDispatch) command;
                    final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
                    if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {

                        if (suppressMessageDispatch(md, sub)) {
                            LOG.debug("{} message not forwarded to {} because message came from there or fails TTL, brokerPath: {}, message: {}", new Object[]{
                                    configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage()
                            });
                            // still ack as it may be durable
                            try {
                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
                            } finally {
                                sub.decrementOutstandingResponses();
                            }
                            return;
                        }

                        Message message = configureMessage(md);
                        LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}", new Object[]{
                                configuration.getBrokerName(), remoteBrokerName, md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), (LOG.isTraceEnabled() ? message : message.getMessageId())
                        });
                        if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
                            try {
                                // never request b/c they are eventually                     acked async
                                remoteBroker.oneway(message);
                            } finally {
                                sub.decrementOutstandingResponses();
                            }
                            return;
                        }
                        if (isPermissableDestination(md.getDestination())) {
                           if (message.isPersistent() || configuration.isAlwaysSyncSend()) {

                              // The message was not sent using async send, so we should only
                              // ack the local broker when we get confirmation that the remote
                              // broker has received the message.
                              remoteBroker.asyncRequest(message, new ResponseCallback() {
                                 @Override
                                 public void onCompletion(FutureResponse future) {
                                    try {
                                       Response response = future.getResult();
                                       if (response.isException()) {
                                          ExceptionResponse er = (ExceptionResponse) response;
                                          serviceLocalException(md, er.getException());
                                       } else {
                                          localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
                                          networkBridgeStatistics.getDequeues().increment();
                                       }
                                    } catch (IOException e) {
                                       serviceLocalException(md, e);
                                    } finally {
                                       sub.decrementOutstandingResponses();
                                    }
                                 }
                              });

                           } else {
                              // If the message was originally sent using async send, we will
                              // preserve that QOS by bridging it using an async send (small chance
                              // of message loss).
                              try {
                                 remoteBroker.oneway(message);
                                 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
                                 networkBridgeStatistics.getDequeues().increment();
                              } finally {
                                 sub.decrementOutstandingResponses();
                              }
                           }
                           serviceOutbound(message);
                        }
                    } else {
                        LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage());
                    }
                } else if (command.isBrokerInfo()) {
                    futureLocalBrokerInfo.set((BrokerInfo) command);
                } else if (command.isShutdownInfo()) {
                    LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName());
                    stop();
                } else if (command.getClass() == ConnectionError.class) {
                    ConnectionError ce = (ConnectionError) command;
                    serviceLocalException(ce.getException());
                } else {
                    switch (command.getDataStructureType()) {
                        case WireFormatInfo.DATA_STRUCTURE_TYPE:
                            break;
                        case BrokerSubscriptionInfo.DATA_STRUCTURE_TYPE:
                            break;
                        default:
                            LOG.warn("Unexpected local command: {}", command);
                    }
                }
            } catch (Throwable e) {
                LOG.warn("Caught an exception processing local command", e);
                serviceLocalException(e);
            }
        }
    }