public void doRouting()

in core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/AbstractJMSFlow.java [441:501]


    public void doRouting(MessageExchangeImpl me) throws MessagingException {
        // let ActiveMQ do the routing ...
        try {
            String destination;
            if (me.getRole() == Role.PROVIDER) {
                if (me.getDestinationId() == null) {
                    destination = INBOUND_PREFIX + EndpointSupport.getKey(me.getEndpoint());
                } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_PROVIDER)) && !isSynchronous(me)) {
                    destination = INBOUND_PREFIX + me.getDestinationId().getName();
                } else {
                    destination = INBOUND_PREFIX + me.getDestinationId().getContainerName();
                }
            } else {
                if (me.getSourceId() == null) {
                    throw new IllegalStateException("No sourceId set on the exchange");
                } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_CONSUMER)) && !isSynchronous(me)) {
                    // If the consumer is stateless and has specified a sender
                    // endpoint,
                    // this exchange will be sent to the given endpoint queue,
                    // so that
                    // fail-over and load-balancing can be achieved
                    // This property must have been created using
                    // EndpointSupport.getKey
                    if (me.getProperty(JbiConstants.SENDER_ENDPOINT) != null) {
                        destination = INBOUND_PREFIX + me.getProperty(JbiConstants.SENDER_ENDPOINT);
                    } else {
                        destination = INBOUND_PREFIX + me.getSourceId().getName();
                    }
                } else {
                    destination = INBOUND_PREFIX + me.getSourceId().getContainerName();
                }
            }

            Connection cnx = connection;
            // with a PooledConnectionFactory get a new connection from the pool
            boolean useConnectionFromPool = (connectionFactory instanceof PooledConnectionFactory)
                && ((PooledConnectionFactory)connectionFactory).getMaxConnections() > 1;
            if (useConnectionFromPool) {
                cnx = connectionFactory.createConnection();
                cnx.start();
            }
            
            Session inboundSession = cnx.createSession(false, Session.AUTO_ACKNOWLEDGE);
            try {
                Queue queue = inboundSession.createQueue(destination);
                ObjectMessage msg = inboundSession.createObjectMessage(me);
                // Set message priority.
                Integer priority = (Integer) me.getProperty(JbiConstants.MESSAGE_PRIORITY);
                if (null != priority) {
                    msg.setJMSPriority(priority);
                }
                MessageProducer queueProducer = inboundSession.createProducer(queue);
                queueProducer.send(msg);
            } finally {
                inboundSession.close();
            }
        } catch (JMSException e) {
            LOGGER.error("Failed to send exchange: " + me + " internal JMS Network", e);
            throw new MessagingException(e);
        }
    }