in activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java [735:913]
protected void serviceRemoteCommand(Command command) {
if (!disposed.get()) {
try {
if (command.isMessageDispatch()) {
safeWaitUntilStarted();
MessageDispatch md = (MessageDispatch) command;
serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
ackAdvisory(md.getMessage());
} else if (command.isBrokerInfo()) {
futureRemoteBrokerInfo.set((BrokerInfo) command);
} else if (command instanceof BrokerSubscriptionInfo) {
final BrokerSubscriptionInfo brokerSubscriptionInfo = (BrokerSubscriptionInfo) command;
//Start in a new thread so we don't block the transport waiting for staticDestinations
syncExecutor.execute(new Runnable() {
@Override
public void run() {
try {
staticDestinationsLatch.await();
//Make sure after the countDown of staticDestinationsLatch we aren't stopping
if (!disposed.get()) {
BrokerSubscriptionInfo subInfo = brokerSubscriptionInfo;
LOG.debug("Received Remote BrokerSubscriptionInfo on {} from {}",
brokerService.getBrokerName(), subInfo.getBrokerName());
if (configuration.isSyncDurableSubs() && configuration.isConduitSubscriptions()
&& !configuration.isDynamicOnly()) {
if (started.get()) {
if (subInfo.getSubscriptionInfos() != null) {
for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
//re-add any process any non-NC consumers that match the
//dynamicallyIncludedDestinations list
//Also re-add network consumers that are not part of this direct
//bridge (proxy of proxy bridges)
if((info.getSubscriptionName() == null || !isDirectBridgeConsumer(info)) &&
NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) {
serviceRemoteConsumerAdvisory(info);
}
}
}
//After re-added, clean up any empty durables
for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
DemandSubscription ds = i.next();
if (NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, ds.getLocalInfo().getDestination())) {
cleanupDurableSub(ds, i);
}
}
}
}
}
} catch (Exception e) {
LOG.warn("Error processing BrokerSubscriptionInfo: {}", e.getMessage(), e);
LOG.debug(e.getMessage(), e);
}
}
});
} else if (command.getClass() == ConnectionError.class) {
ConnectionError ce = (ConnectionError) command;
serviceRemoteException(ce.getException());
} else {
if (isDuplex()) {
LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType());
if (command.isMessage()) {
final ActiveMQMessage message = (ActiveMQMessage) command;
if (NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
serviceRemoteConsumerAdvisory(message.getDataStructure());
ackAdvisory(message);
} else {
if (!isPermissableDestination(message.getDestination(), true)) {
return;
}
safeWaitUntilStarted();
// message being forwarded - we need to
// propagate the response to our local send
if (canDuplexDispatch(message)) {
message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
final int correlationId = message.getCommandId();
@Override
public void onCompletion(FutureResponse resp) {
try {
Response reply = resp.getResult();
reply.setCorrelationId(correlationId);
remoteBroker.oneway(reply);
//increment counter when messages are received in duplex mode
networkBridgeStatistics.getReceivedCount().increment();
} catch (IOException error) {
LOG.error("Exception: {} on duplex forward of: {}", error, message);
serviceRemoteException(error);
}
}
});
} else {
duplexInboundLocalBroker.oneway(message);
networkBridgeStatistics.getReceivedCount().increment();
}
serviceInboundMessage(message);
} else {
if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
Response reply = new Response();
reply.setCorrelationId(message.getCommandId());
remoteBroker.oneway(reply);
}
}
}
} else {
switch (command.getDataStructureType()) {
case ConnectionInfo.DATA_STRUCTURE_TYPE:
if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) {
// end of initiating connection setup - propogate to initial connection to get mbean by clientid
duplexInitiatingConnection.processAddConnection((ConnectionInfo) command);
} else {
localBroker.oneway(command);
}
break;
case SessionInfo.DATA_STRUCTURE_TYPE:
localBroker.oneway(command);
break;
case ProducerInfo.DATA_STRUCTURE_TYPE:
// using duplexInboundLocalProducerInfo
break;
case MessageAck.DATA_STRUCTURE_TYPE:
MessageAck ack = (MessageAck) command;
DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
if (localSub != null) {
ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
localBroker.oneway(ack);
} else {
LOG.warn("Matching local subscription not found for ack: {}", ack);
}
break;
case ConsumerInfo.DATA_STRUCTURE_TYPE:
localStartedLatch.await();
if (started.get()) {
final ConsumerInfo consumerInfo = (ConsumerInfo) command;
if (isDuplicateSuppressionOff(consumerInfo)) {
addConsumerInfo(consumerInfo);
} else {
synchronized (brokerService.getVmConnectorURI()) {
addConsumerInfo(consumerInfo);
}
}
} else {
// received a subscription whilst stopping
LOG.warn("Stopping - ignoring ConsumerInfo: {}", command);
}
break;
case ShutdownInfo.DATA_STRUCTURE_TYPE:
// initiator is shutting down, controlled case
// abortive close dealt with by inactivity monitor
LOG.info("Stopping network bridge on shutdown of remote broker");
serviceRemoteException(new IOException(command.toString()));
break;
default:
LOG.debug("Ignoring remote command: {}", command);
}
}
} else {
switch (command.getDataStructureType()) {
case KeepAliveInfo.DATA_STRUCTURE_TYPE:
case WireFormatInfo.DATA_STRUCTURE_TYPE:
case ShutdownInfo.DATA_STRUCTURE_TYPE:
break;
default:
LOG.warn("Unexpected remote command: {}", command);
}
}
}
} catch (Throwable e) {
LOG.debug("Exception processing remote command: {}", command, e);
serviceRemoteException(e);
}
}
}