in activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java [725:915]
protected void serviceRemoteCommand(Command command) {
if (!disposed.get()) {
try {
if (command.isMessageDispatch()) {
safeWaitUntilStarted();
MessageDispatch md = (MessageDispatch) command;
serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
ackAdvisory(md.getMessage());
} else if (command.isBrokerInfo()) {
futureRemoteBrokerInfo.set((BrokerInfo) command);
} else if (command instanceof BrokerSubscriptionInfo) {
final BrokerSubscriptionInfo brokerSubscriptionInfo = (BrokerSubscriptionInfo) command;
// Skip the durable sync if any of the following are true:
// 1) if the flag is set to false.
// 2) If dynamicOnly is true, this means means to only activate when the real
// consumers come back so we need to skip. This mode is useful espeically when
// setting TTL > 1 as the TTL info is tied to consumers
// 3) If conduit subscriptions is disable we also skip, for the same reason we
// skip when dynamicOnly is true, that we need to let consumers entirely drive
// the creation/removal of subscriptions as each consumer gets their own
if (!configuration.isSyncDurableSubs() || !configuration.isConduitSubscriptions()
|| configuration.isDynamicOnly()) {
return;
}
//Start in a new thread so we don't block the transport waiting for staticDestinations
syncExecutor.execute(() -> {
try {
staticDestinationsLatch.await();
//Make sure after the countDown of staticDestinationsLatch we aren't stopping
if (!disposed.get() && started.get()) {
final BrokerSubscriptionInfo subInfo = brokerSubscriptionInfo;
LOG.debug("Received Remote BrokerSubscriptionInfo on {} from {}",
brokerService.getBrokerName(), subInfo.getBrokerName());
// Go through and subs sent and see if we can add demand
if (subInfo.getSubscriptionInfos() != null) {
// Re-add and process subscriptions on the remote broker to add demand
for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
// Brokers filter what is sent, but the filtering logic has changed between
// versions, plus some durables sent are only processed for removes so we
// need to filter what to process for adding demand
if (NetworkBridgeUtils.matchesConfigForDurableSync(configuration,
info.getClientId(), info.getSubscriptionName(), info.getDestination())) {
serviceRemoteConsumerAdvisory(info);
}
}
}
//After processing demand to add, clean up any empty durables
for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
DemandSubscription ds = i.next();
// This filters on destinations to see if we should process possible removal
// based on the bridge configuration (included dests, TTL, etc).
if (NetworkBridgeUtils.matchesDestinations(configuration.getDynamicallyIncludedDestinations(),
ds.getLocalInfo().getDestination())) {
// Note that this method will further check that there are no remote
// demand that was previously added or associated. If there are remote
// subscriptions tied to the DS, then it will not be removed.
cleanupDurableSub(ds, i);
}
}
}
} catch (Exception e) {
LOG.warn("Error processing BrokerSubscriptionInfo: {}", e.getMessage(), e);
LOG.debug(e.getMessage(), e);
}
});
} else if (command.getClass() == ConnectionError.class) {
ConnectionError ce = (ConnectionError) command;
serviceRemoteException(ce.getException());
} else {
if (isDuplex()) {
LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType());
if (command.isMessage()) {
final ActiveMQMessage message = (ActiveMQMessage) command;
if (NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
serviceRemoteConsumerAdvisory(message.getDataStructure());
ackAdvisory(message);
} else {
if (!isPermissableDestination(message.getDestination(), true)) {
return;
}
safeWaitUntilStarted();
// message being forwarded - we need to
// propagate the response to our local send
if (canDuplexDispatch(message)) {
message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
final int correlationId = message.getCommandId();
@Override
public void onCompletion(FutureResponse resp) {
try {
Response reply = resp.getResult();
reply.setCorrelationId(correlationId);
remoteBroker.oneway(reply);
//increment counter when messages are received in duplex mode
networkBridgeStatistics.getReceivedCount().increment();
} catch (IOException error) {
LOG.error("Exception: {} on duplex forward of: {}", error, message);
serviceRemoteException(error);
}
}
});
} else {
duplexInboundLocalBroker.oneway(message);
networkBridgeStatistics.getReceivedCount().increment();
}
serviceInboundMessage(message);
} else {
if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
Response reply = new Response();
reply.setCorrelationId(message.getCommandId());
remoteBroker.oneway(reply);
}
}
}
} else {
switch (command.getDataStructureType()) {
case ConnectionInfo.DATA_STRUCTURE_TYPE:
if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) {
// end of initiating connection setup - propogate to initial connection to get mbean by clientid
duplexInitiatingConnection.processAddConnection((ConnectionInfo) command);
} else {
localBroker.oneway(command);
}
break;
case SessionInfo.DATA_STRUCTURE_TYPE:
localBroker.oneway(command);
break;
case ProducerInfo.DATA_STRUCTURE_TYPE:
// using duplexInboundLocalProducerInfo
break;
case MessageAck.DATA_STRUCTURE_TYPE:
MessageAck ack = (MessageAck) command;
DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
if (localSub != null) {
ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
localBroker.oneway(ack);
} else {
LOG.warn("Matching local subscription not found for ack: {}", ack);
}
break;
case ConsumerInfo.DATA_STRUCTURE_TYPE:
localStartedLatch.await();
if (started.get()) {
final ConsumerInfo consumerInfo = (ConsumerInfo) command;
if (isDuplicateSuppressionOff(consumerInfo)) {
addConsumerInfo(consumerInfo);
} else {
synchronized (brokerService.getVmConnectorURI()) {
addConsumerInfo(consumerInfo);
}
}
} else {
// received a subscription whilst stopping
LOG.warn("Stopping - ignoring ConsumerInfo: {}", command);
}
break;
case ShutdownInfo.DATA_STRUCTURE_TYPE:
// initiator is shutting down, controlled case
// abortive close dealt with by inactivity monitor
LOG.info("Stopping network bridge on shutdown of remote broker");
serviceRemoteException(new IOException(command.toString()));
break;
default:
LOG.debug("Ignoring remote command: {}", command);
}
}
} else {
switch (command.getDataStructureType()) {
case KeepAliveInfo.DATA_STRUCTURE_TYPE:
case WireFormatInfo.DATA_STRUCTURE_TYPE:
case ShutdownInfo.DATA_STRUCTURE_TYPE:
break;
default:
LOG.warn("Unexpected remote command: {}", command);
}
}
}
} catch (Throwable e) {
LOG.debug("Exception processing remote command: {}", command, e);
serviceRemoteException(e);
}
}
}