in activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java [1198:1300]
protected void serviceLocalCommand(Command command) {
if (!disposed.get()) {
try {
if (command.isMessageDispatch()) {
safeWaitUntilStarted();
networkBridgeStatistics.getEnqueues().increment();
final MessageDispatch md = (MessageDispatch) command;
final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
if (suppressMessageDispatch(md, sub)) {
LOG.debug("{} message not forwarded to {} because message came from there or fails TTL, brokerPath: {}, message: {}", new Object[]{
configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage()
});
// still ack as it may be durable
try {
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
} finally {
sub.decrementOutstandingResponses();
}
return;
}
Message message = configureMessage(md);
LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}", new Object[]{
configuration.getBrokerName(), remoteBrokerName, md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), (LOG.isTraceEnabled() ? message : message.getMessageId())
});
if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
try {
// never request b/c they are eventually acked async
remoteBroker.oneway(message);
} finally {
sub.decrementOutstandingResponses();
}
return;
}
if (isPermissableDestination(md.getDestination())) {
if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
// The message was not sent using async send, so we should only
// ack the local broker when we get confirmation that the remote
// broker has received the message.
remoteBroker.asyncRequest(message, new ResponseCallback() {
@Override
public void onCompletion(FutureResponse future) {
try {
Response response = future.getResult();
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse) response;
serviceLocalException(md, er.getException());
} else {
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
networkBridgeStatistics.getDequeues().increment();
}
} catch (IOException e) {
serviceLocalException(md, e);
} finally {
sub.decrementOutstandingResponses();
}
}
});
} else {
// If the message was originally sent using async send, we will
// preserve that QOS by bridging it using an async send (small chance
// of message loss).
try {
remoteBroker.oneway(message);
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
networkBridgeStatistics.getDequeues().increment();
} finally {
sub.decrementOutstandingResponses();
}
}
serviceOutbound(message);
}
} else {
LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage());
}
} else if (command.isBrokerInfo()) {
futureLocalBrokerInfo.set((BrokerInfo) command);
} else if (command.isShutdownInfo()) {
LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName());
stop();
} else if (command.getClass() == ConnectionError.class) {
ConnectionError ce = (ConnectionError) command;
serviceLocalException(ce.getException());
} else {
switch (command.getDataStructureType()) {
case WireFormatInfo.DATA_STRUCTURE_TYPE:
break;
case BrokerSubscriptionInfo.DATA_STRUCTURE_TYPE:
break;
default:
LOG.warn("Unexpected local command: {}", command);
}
}
} catch (Throwable e) {
LOG.warn("Caught an exception processing local command", e);
serviceLocalException(e);
}
}
}