in core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java [441:501]
public void doRouting(MessageExchangeImpl me) throws MessagingException {
// let ActiveMQ do the routing ...
try {
String destination;
if (me.getRole() == Role.PROVIDER) {
if (me.getDestinationId() == null) {
destination = INBOUND_PREFIX + EndpointSupport.getKey(me.getEndpoint());
} else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_PROVIDER)) && !isSynchronous(me)) {
destination = INBOUND_PREFIX + me.getDestinationId().getName();
} else {
destination = INBOUND_PREFIX + me.getDestinationId().getContainerName();
}
} else {
if (me.getSourceId() == null) {
throw new IllegalStateException("No sourceId set on the exchange");
} else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_CONSUMER)) && !isSynchronous(me)) {
// If the consumer is stateless and has specified a sender
// endpoint,
// this exchange will be sent to the given endpoint queue,
// so that
// fail-over and load-balancing can be achieved
// This property must have been created using
// EndpointSupport.getKey
if (me.getProperty(JbiConstants.SENDER_ENDPOINT) != null) {
destination = INBOUND_PREFIX + me.getProperty(JbiConstants.SENDER_ENDPOINT);
} else {
destination = INBOUND_PREFIX + me.getSourceId().getName();
}
} else {
destination = INBOUND_PREFIX + me.getSourceId().getContainerName();
}
}
Connection cnx = connection;
// with a PooledConnectionFactory get a new connection from the pool
boolean useConnectionFromPool = (connectionFactory instanceof PooledConnectionFactory)
&& ((PooledConnectionFactory)connectionFactory).getMaxConnections() > 1;
if (useConnectionFromPool) {
cnx = connectionFactory.createConnection();
cnx.start();
}
Session inboundSession = cnx.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
Queue queue = inboundSession.createQueue(destination);
ObjectMessage msg = inboundSession.createObjectMessage(me);
// Set message priority.
Integer priority = (Integer) me.getProperty(JbiConstants.MESSAGE_PRIORITY);
if (null != priority) {
msg.setJMSPriority(priority);
}
MessageProducer queueProducer = inboundSession.createProducer(queue);
queueProducer.send(msg);
} finally {
inboundSession.close();
}
} catch (JMSException e) {
LOGGER.error("Failed to send exchange: " + me + " internal JMS Network", e);
throw new MessagingException(e);
}
}