public void doRouting()

in core/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java [510:553]


    public void doRouting(final MessageExchangeImpl me) throws MessagingException {
        // let ActiveMQ do the routing ...
        try {
            String destination;
            if (me.getRole() == Role.PROVIDER) {
                if (me.getDestinationId() == null) {
                    destination = INBOUND_PREFIX + EndpointSupport.getKey(me.getEndpoint());
                } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_PROVIDER)) && !isSynchronous(me)) {
                    destination = INBOUND_PREFIX + me.getDestinationId().getName();
                } else {
                    destination = INBOUND_PREFIX + me.getDestinationId().getContainerName();
                }
            } else {
                if (me.getSourceId() == null) {
                    throw new IllegalStateException("No sourceId set on the exchange");
                } else if (Boolean.TRUE.equals(me.getProperty(JbiConstants.STATELESS_CONSUMER)) && !isSynchronous(me)) {
                    // If the consumer is stateless and has specified a sender
                    // endpoint,
                    // this exchange will be sent to the given endpoint queue,
                    // so that
                    // This property must have been created using
                    // EndpointSupport.getKey
                    // fail-over and load-balancing can be achieved
                    if (me.getProperty(JbiConstants.SENDER_ENDPOINT) != null) {
                        destination = INBOUND_PREFIX + me.getProperty(JbiConstants.SENDER_ENDPOINT);
                    } else {
                        destination = INBOUND_PREFIX + me.getSourceId().getName();
                    }
                } else {
                    destination = INBOUND_PREFIX + me.getSourceId().getContainerName();
                }
            }
            if (me.isTransacted()) {
                me.setTxState(MessageExchangeImpl.TX_STATE_ENLISTED);
            }
            sendJmsMessage(new ActiveMQQueue(destination), me, isPersistent(me), me.isTransacted());
        } catch (JMSException e) {
            LOGGER.error("Failed to send exchange: " + me + " internal JMS Network", e);
            throw new MessagingException(e);
        } catch (SystemException e) {
            LOGGER.error("Failed to send exchange: " + me + " transaction problem", e);
            throw new MessagingException(e);
        }
    }