in bindings/servicemix-jms/src/main/java/org/apache/servicemix/jms/standard/StandardProviderProcessor.java [63:131]
public void process(MessageExchange exchange) throws Exception {
if (exchange.getStatus() == ExchangeStatus.DONE) {
return;
} else if (exchange.getStatus() == ExchangeStatus.ERROR) {
return;
}
Session session = null;
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
Message msg = createMessageFromExchange(session, exchange);
if (exchange instanceof InOnly || exchange instanceof RobustInOnly) {
producer.send(msg);
exchange.setStatus(ExchangeStatus.DONE);
channel.send(exchange);
} else if (exchange instanceof InOut) {
Destination replyDestination;
if (replyToDestination != null) {
replyDestination = replyToDestination;
} else {
if (destination instanceof Queue) {
replyDestination = session.createTemporaryQueue();
} else {
replyDestination = session.createTemporaryTopic();
}
}
MessageConsumer consumer = session.createConsumer(replyDestination);
msg.setJMSCorrelationID(exchange.getExchangeId());
msg.setJMSReplyTo(replyDestination);
producer.send(msg);
Message message = consumer.receive();
if (message instanceof ObjectMessage) {
Object o = ((ObjectMessage) message).getObject();
if (o instanceof Exception) {
exchange.setError((Exception) o);
} else {
throw new UnsupportedOperationException("Can not handle objects of type " + o.getClass().getName());
}
} else {
InputStream is = null;
if (message instanceof TextMessage) {
is = new ByteArrayInputStream(((TextMessage) message).getText().getBytes());
} else if (message instanceof BytesMessage) {
int length = (int) ((BytesMessage) message).getBodyLength();
byte[] bytes = new byte[length];
((BytesMessage) message).readBytes(bytes);
is = new ByteArrayInputStream(bytes);
} else {
throw new IllegalArgumentException("JMS message should be a text or bytes message");
}
String contentType = message.getStringProperty(CONTENT_TYPE);
SoapMessage soap = soapHelper.getSoapMarshaler().createReader().read(is, contentType);
NormalizedMessage out = exchange.createMessage();
soapHelper.getJBIMarshaler().toNMS(out, soap);
((InOut) exchange).setOutMessage(out);
}
channel.send(exchange);
} else {
throw new IllegalStateException(exchange.getPattern() + " not implemented");
}
} finally {
if (session != null) {
session.close();
}
}
}