in bindings/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsProviderEndpoint.java [631:732]
protected void processInOutInSession(final MessageExchange exchange,
final NormalizedMessage in,
final Session session) throws Exception {
// TODO: the following code may not work in pub/sub domain with temporary topics.
// The reason is that the the consumer is created after the request is sent.
// This mean that the response may be sent before the consumer is created, which would
// mean the consumer will not receive the response as it does not use durable subscriptions.
// Create destinations
Destination dest = getDestination(exchange, in, session);
// Find reply destination
// If we use the replyDestination / replyDestinationName, set the async flag to true
// to indicate we will use the listener container
boolean asynchronous = false;
boolean useSelector = true;
// Indicate whether the replyTo destination is temporary or explicitely specified replyTo destination
boolean isReplyDestTemporary = false;
Destination replyDest = chooseDestination(exchange, in, session, replyDestinationChooser, null);
if (replyDest == null) {
useSelector = false;
replyDest = chooseDestination(exchange, in, session, null,
replyDestination != null ? replyDestination : replyDestinationName);
if (replyDest != null) {
asynchronous = true;
} else {
if (isPubSubDomain()) {
replyDest = session.createTemporaryTopic();
} else {
replyDest = session.createTemporaryQueue();
}
isReplyDestTemporary = true;
}
}
// Create message and send it
final Message sendJmsMsg = marshaler.createMessage(exchange, in, session);
sendJmsMsg.setJMSReplyTo(replyDest);
// handle correlation ID
String correlationId = sendJmsMsg.getJMSMessageID() != null ? sendJmsMsg.getJMSMessageID() : exchange.getExchangeId();
sendJmsMsg.setJMSCorrelationID(correlationId);
if (asynchronous) {
createAndStartListener();
store.store(correlationId, exchange);
}
try {
send(session, dest, sendJmsMsg);
} catch (Exception e) {
if (asynchronous) {
store.load(exchange.getExchangeId());
}
throw e;
}
if (!asynchronous) {
// Create selector
String selector = useSelector ? (MSG_SELECTOR_START + sendJmsMsg.getJMSCorrelationID() + MSG_SELECTOR_END) : null;
// Receiving JMS Message, Creating and Returning NormalizedMessage out
Message receiveJmsMsg = receiveSelected(session, replyDest, selector);
if (receiveJmsMsg == null) {
throw new IllegalStateException("Unable to receive response");
}
if (receiveJmsMsg.getBooleanProperty(AbstractJmsMarshaler.DONE_JMS_PROPERTY)) {
exchange.setStatus(ExchangeStatus.DONE);
} else if (receiveJmsMsg.getBooleanProperty(AbstractJmsMarshaler.ERROR_JMS_PROPERTY)) {
Exception e = (Exception) ((ObjectMessage) receiveJmsMsg).getObject();
exchange.setError(e);
exchange.setStatus(ExchangeStatus.ERROR);
} else if (receiveJmsMsg.getBooleanProperty(AbstractJmsMarshaler.FAULT_JMS_PROPERTY)) {
Fault fault = exchange.getFault();
if (fault == null) {
fault = exchange.createFault();
exchange.setFault(fault);
}
marshaler.populateMessage(receiveJmsMsg, exchange, fault);
} else {
NormalizedMessage out = exchange.getMessage("out");
if (out == null) {
out = exchange.createMessage();
exchange.setMessage(out, "out");
}
marshaler.populateMessage(receiveJmsMsg, exchange, out);
}
boolean txSync = exchange.getStatus() == ExchangeStatus.ACTIVE
&& exchange.isTransacted()
&& Boolean.TRUE.equals(exchange.getProperty(JbiConstants.SEND_SYNC));
if (txSync) {
sendSync(exchange);
} else {
send(exchange);
}
// delete temporary queue/topic immediately to avoid accumulation in case that the connection is never destroyed
if (isReplyDestTemporary) {
if (isPubSubDomain()) {
((TemporaryTopic)replyDest).delete();
} else {
((TemporaryQueue)replyDest).delete();
}
}
}
}