in jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java [580:695]
protected void process(JmsRequestor requestor) throws JMSException {
javax.jms.Message message = requestor.getMessage();
int type = message.getIntProperty(JBI_MESSAGE);
switch (type) {
case JBI_MESSAGE_DONE: {
String corrId = message.getStringProperty(PROPERTY_CORR_ID);
if (corrId == null) {
throw new IllegalStateException("Incoming JMS message has no correlationId");
}
Exchange exchange = exchanges.remove(corrId);
if (exchange == null) {
throw new IllegalStateException("Exchange not found for id " + corrId);
}
done(exchange);
break;
}
case JBI_MESSAGE_ERROR: {
String corrId = message.getStringProperty(PROPERTY_CORR_ID);
if (corrId == null) {
throw new IllegalStateException("Incoming JMS message has no correlationId");
}
Exchange exchange = exchanges.remove(corrId);
if (exchange == null) {
throw new IllegalStateException("Exchange not found for id " + corrId);
}
fail(exchange, (Exception)((ObjectMessage) message).getObject());
break;
}
case JBI_MESSAGE_IN: {
String mep = message.getStringProperty(JBI_MEP);
if (mep == null) {
throw new IllegalStateException("Exchange MEP not found for JMS message " + message.getJMSMessageID());
}
Exchange exchange = getChannel().createExchange(Pattern.fromWsdlUri(mep));
exchange.setProperty(PROPERTY_ROLLBACK_ON_ERRORS + "." + name, message.getBooleanProperty(PROPERTY_ROLLBACK_ON_ERRORS));
if (message.propertyExists(JBI_INTERFACE)) {
exchange.setProperty(MessageExchangeImpl.INTERFACE_NAME_PROP, QName.valueOf(message.getStringProperty(JBI_INTERFACE)));
}
if (message.propertyExists(JBI_OPERATION)) {
exchange.setOperation(QName.valueOf(message.getStringProperty(JBI_OPERATION)));
}
if (message.propertyExists(JBI_SERVICE)) {
exchange.setProperty(MessageExchangeImpl.SERVICE_NAME_PROP, QName.valueOf(message.getStringProperty(JBI_SERVICE)));
}
if (message.propertyExists(JBI_ENDPOINT)) {
QName q = QName.valueOf(message.getStringProperty(JBI_ENDPOINT));
String e = q.getLocalPart();
q = QName.valueOf(q.getNamespaceURI());
ServiceEndpoint se = getEndpoint(q, e);
// TODO: check that endpoint exists
exchange.setProperty(MessageExchangeImpl.SERVICE_ENDPOINT_PROP, se);
}
// Re-process JBI addressing
DeliveryChannelImpl.createTarget(getChannel().getNMR(), exchange);
// TODO: read exchange properties
Message msg = unmarshallMessage(message);
exchange.setIn(msg);
exchanges.put(exchange.getId(), exchange);
if (pendingExchanges.incrementAndGet() >= maxPendingExchanges) {
if (pauseConsumption.compareAndSet(false, true)) {
invalidateSelector();
}
}
exchange.setProperty(PROPERTY_CORR_ID + "." + name, exchange.getId());
requestor.suspend(exchange.getId());
if (requestor.getTransaction() != null) {
exchange.setProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME, requestor.getTransaction());
}
send(exchange);
break;
}
case JBI_MESSAGE_OUT: {
String corrId = message.getStringProperty(PROPERTY_CORR_ID);
if (corrId == null) {
throw new IllegalStateException("Incoming JMS message has no correlationId");
}
Exchange exchange = exchanges.get(corrId);
if (exchange == null) {
throw new IllegalStateException("Exchange not found for id " + corrId);
}
Message msg = unmarshallMessage(message);
exchange.setOut(msg);
exchanges.put(exchange.getId(), exchange);
exchange.setProperty(PROPERTY_CORR_ID + "." + name, exchange.getId());
requestor.suspend(exchange.getId());
if (requestor.getTransaction() != null) {
exchange.setProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME, requestor.getTransaction());
}
send(exchange);
break;
}
case JBI_MESSAGE_FAULT: {
String corrId = message.getStringProperty(PROPERTY_CORR_ID);
if (corrId == null) {
throw new IllegalStateException("Incoming JMS message has no correlationId");
}
Exchange exchange = exchanges.get(corrId);
if (exchange == null) {
throw new IllegalStateException("Exchange not found for id " + corrId);
}
Message msg = unmarshallMessage(message);
exchange.setFault(msg);
exchanges.put(exchange.getId(), exchange);
exchange.setProperty(PROPERTY_CORR_ID + "." + name, exchange.getId());
requestor.suspend(exchange.getId());
if (requestor.getTransaction() != null) {
exchange.setProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME, requestor.getTransaction());
}
send(exchange);
break;
}
default: {
throw new IllegalStateException("Received unknown message type: " + type);
}
}
}