in core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java [510:553]
public void doRouting(final MessageExchangeImpl me) throws MessagingException {
// let ActiveMQ do the routing ...
try {
String destination;
if (me.getRole() == Role.PROVIDER) {
if (me.getDestinationId() == null) {
destination = INBOUND_PREFIX + EndpointSupport.getKey(me.getEndpoint());
} else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_PROVIDER)) && !isSynchronous(me)) {
destination = INBOUND_PREFIX + me.getDestinationId().getName();
} else {
destination = INBOUND_PREFIX + me.getDestinationId().getContainerName();
}
} else {
if (me.getSourceId() == null) {
throw new IllegalStateException("No sourceId set on the exchange");
} else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_CONSUMER)) && !isSynchronous(me)) {
// If the consumer is stateless and has specified a sender
// endpoint,
// this exchange will be sent to the given endpoint queue,
// so that
// This property must have been created using
// EndpointSupport.getKey
// fail-over and load-balancing can be achieved
if (me.getProperty(JbiConstants.SENDER_ENDPOINT) != null) {
destination = INBOUND_PREFIX + me.getProperty(JbiConstants.SENDER_ENDPOINT);
} else {
destination = INBOUND_PREFIX + me.getSourceId().getName();
}
} else {
destination = INBOUND_PREFIX + me.getSourceId().getContainerName();
}
}
if (me.isTransacted()) {
me.setTxState(MessageExchangeImpl.TX_STATE_ENLISTED);
}
sendJmsMessage(new ActiveMQQueue(destination), me, isPersistent(me), me.isTransacted());
} catch (JMSException e) {
LOGGER.error("Failed to send exchange: " + me + " internal JMS Network", e);
throw new MessagingException(e);
} catch (SystemException e) {
LOGGER.error("Failed to send exchange: " + me + " transaction problem", e);
throw new MessagingException(e);
}
}