in artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java [1155:1274]
private RoutingStatus route(final Message message,
final RoutingContext context,
final boolean direct,
final boolean rejectDuplicates,
final Binding bindingMove,
final boolean sendToDLA) throws Exception {
// Sanity check
if (message.getRefCount() > 0) {
throw new IllegalStateException("Message cannot be routed more than once");
}
final SimpleString address = context.getAddress(message);
final AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
if (settings != null) {
applyExpiryDelay(message, settings);
}
final boolean startedTX;
if (context.isDuplicateDetection()) {
final DuplicateCheckResult duplicateCheckResult = checkDuplicateID(message, context, rejectDuplicates);
switch (duplicateCheckResult) {
case DuplicateNotStartedTX:
return RoutingStatus.DUPLICATED_ID;
case NoDuplicateStartedTX:
startedTX = true;
break;
case NoDuplicateNotStartedTX:
startedTX = false;
//nop
break;
default:
throw new IllegalStateException("Unexpected value: " + duplicateCheckResult);
}
} else {
startedTX = false;
}
if (context.getMirrorSource() == null) {
message.clearAMQPProperties();
}
message.clearInternalProperties();
Bindings bindings;
final AddressInfo addressInfo = checkAddress(context, address);
final RoutingStatus status;
if (bindingMove != null) {
context.clear();
context.setReusable(false);
bindingMove.route(message, context);
if (addressInfo != null) {
addressInfo.incrementRoutedMessageCount();
}
status = RoutingStatus.OK;
} else {
bindings = simpleRoute(address, context, message, addressInfo);
if (logger.isDebugEnabled()) {
if (bindings != null) {
logger.debug("PostOffice::simpleRoute returned bindings with size = {}", bindings.getBindings().size());
} else {
logger.debug("PostOffice::simpleRoute null as bindings");
}
}
if (bindings == null) {
context.setReusable(false);
context.clear();
if (addressInfo != null) {
addressInfo.incrementUnRoutedMessageCount();
}
// this is a debug and not warn because this could be a regular scenario on publish-subscribe queues (or topic subscriptions on JMS)
logger.debug("Couldn't find any bindings for address={} on message={}", address, message);
status = RoutingStatus.NO_BINDINGS;
} else {
status = RoutingStatus.OK;
}
}
if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.beforeMessageRoute(message, context, direct, rejectDuplicates));
}
logger.trace("Message after routed={}\n{}", message, context);
final RoutingStatus finalStatus;
try {
if (status == RoutingStatus.NO_BINDINGS) {
finalStatus = maybeSendToDLA(message, context, address, sendToDLA);
} else {
finalStatus = status;
try {
if (context.getQueueCount() > 0) {
processRoute(message, context, direct);
} else {
if (message.isLargeMessage()) {
((LargeServerMessage) message).deleteFile();
}
}
} catch (ActiveMQAddressFullException e) {
if (startedTX) {
context.getTransaction().rollback();
} else if (context.getTransaction() != null) {
context.getTransaction().markAsRollbackOnly(e);
}
throw e;
}
}
if (startedTX) {
context.getTransaction().commit();
}
if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, finalStatus));
}
return finalStatus;
} catch (Exception e) {
if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.onMessageRouteException(message, context, direct, rejectDuplicates, e));
}
throw e;
}
}