protected void processInOutInSession()

in bindings/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java [631:732]


    protected void processInOutInSession(final MessageExchange exchange,
                                         final NormalizedMessage in,
                                         final Session session) throws Exception {
        // TODO: the following code may not work in pub/sub domain with temporary topics.
        // The reason is that the the consumer is created after the request is sent.
        // This mean that the response may be sent before the consumer is created, which would
        // mean the consumer will not receive the response as it does not use durable subscriptions.

        // Create destinations
        Destination dest = getDestination(exchange, in, session);
        // Find reply destination
        // If we use the replyDestination / replyDestinationName, set the async flag to true
        // to indicate we will use the listener container
        boolean asynchronous = false;
        boolean useSelector = true;
        // Indicate whether the replyTo destination is temporary or explicitely specified replyTo destination
        boolean isReplyDestTemporary = false;
        Destination replyDest = chooseDestination(exchange, in, session, replyDestinationChooser, null);
        if (replyDest == null) {
            useSelector = false;
            replyDest = chooseDestination(exchange, in, session, null,
                                          replyDestination != null ? replyDestination : replyDestinationName);
            if (replyDest != null) {
                asynchronous = true;
            } else {
                if (isPubSubDomain()) {
                    replyDest = session.createTemporaryTopic();
                } else {
                    replyDest = session.createTemporaryQueue();
                }
                isReplyDestTemporary = true;
            }
        }
        // Create message and send it
        final Message sendJmsMsg = marshaler.createMessage(exchange, in, session);
        sendJmsMsg.setJMSReplyTo(replyDest);
        // handle correlation ID
        String correlationId = sendJmsMsg.getJMSMessageID() != null ? sendJmsMsg.getJMSMessageID() : exchange.getExchangeId();
        sendJmsMsg.setJMSCorrelationID(correlationId);

        if (asynchronous) {
            createAndStartListener();
            store.store(correlationId, exchange);
        }

        try {
            send(session, dest, sendJmsMsg);
        } catch (Exception e) {
            if (asynchronous) {
                store.load(exchange.getExchangeId());
            }
            throw e;
        }

        if (!asynchronous) {
            // Create selector
            String selector = useSelector ? (MSG_SELECTOR_START + sendJmsMsg.getJMSCorrelationID() + MSG_SELECTOR_END) : null;
            // Receiving JMS Message, Creating and Returning NormalizedMessage out
            Message receiveJmsMsg = receiveSelected(session, replyDest, selector);
            if (receiveJmsMsg == null) {
                throw new IllegalStateException("Unable to receive response");
            }
            if (receiveJmsMsg.getBooleanProperty(AbstractJmsMarshaler.DONE_JMS_PROPERTY)) {
                exchange.setStatus(ExchangeStatus.DONE);
            } else if (receiveJmsMsg.getBooleanProperty(AbstractJmsMarshaler.ERROR_JMS_PROPERTY)) {
                Exception e = (Exception) ((ObjectMessage) receiveJmsMsg).getObject();
                exchange.setError(e);
                exchange.setStatus(ExchangeStatus.ERROR);
            } else if (receiveJmsMsg.getBooleanProperty(AbstractJmsMarshaler.FAULT_JMS_PROPERTY)) {
                Fault fault = exchange.getFault();
                if (fault == null) {
                    fault = exchange.createFault();
                    exchange.setFault(fault);
                }
                marshaler.populateMessage(receiveJmsMsg, exchange, fault);
            } else {
                NormalizedMessage out = exchange.getMessage("out");
                if (out == null) {
                    out = exchange.createMessage();
                    exchange.setMessage(out, "out");
                }
                marshaler.populateMessage(receiveJmsMsg, exchange, out);
            }
            boolean txSync = exchange.getStatus() == ExchangeStatus.ACTIVE
                                && exchange.isTransacted()
                                && Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
            if (txSync) {
                sendSync(exchange);
            } else {
                send(exchange);
            }

            // delete temporary queue/topic immediately to avoid accumulation in case that the connection is never destroyed
            if (isReplyDestTemporary) {
                if (isPubSubDomain()) {
                    ((TemporaryTopic)replyDest).delete();
                } else {
                    ((TemporaryQueue)replyDest).delete();
                }
            }
        }
    }