in jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java [704:873]
protected void processExchange(JmsRequestor requestor, Exchange exchange) throws Exception {
synchronized (requestor) {
decrementPendingExchangeIfNeeded(exchange);
boolean rollbackOnErrors;
if (exchange.getRole() == Role.Consumer) {
rollbackOnErrors = Boolean.TRUE.equals(exchange.getProperty(PROPERTY_ROLLBACK_ON_ERRORS + "." + name));
} else {
rollbackOnErrors = this.rollbackOnErrors;
}
if (exchange.getStatus() == Status.Active) {
Message msg = exchange.getFault(false);
int type;
if (msg != null) {
type = JBI_MESSAGE_FAULT;
} else {
msg = exchange.getOut(false);
if (msg != null) {
type = JBI_MESSAGE_OUT;
} else {
msg = exchange.getIn(false);
if (msg != null) {
type = JBI_MESSAGE_IN;
} else {
throw new IllegalStateException("No normalized message on an active exchange: " + exchange);
}
}
}
javax.jms.Message message = requestor.getSession().createObjectMessage(msg);
message.setIntProperty(JBI_MESSAGE, type);
if (type == JBI_MESSAGE_IN) {
rollbackOnErrors = this.rollbackOnErrors;
exchange.setProperty(PROPERTY_ROLLBACK_ON_ERRORS + "." + name, rollbackOnErrors);
message.setStringProperty(JBI_MEP, exchange.getPattern().getWsdlUri());
if (exchange.getProperty(MessageExchangeImpl.INTERFACE_NAME_PROP) != null) {
message.setStringProperty(JBI_INTERFACE, exchange.getProperty(MessageExchangeImpl.INTERFACE_NAME_PROP).toString());
}
if (exchange.getOperation() != null) {
message.setStringProperty(JBI_OPERATION, exchange.getOperation().toString());
}
if (exchange.getProperty(MessageExchangeImpl.SERVICE_NAME_PROP) != null) {
message.setStringProperty(JBI_SERVICE, exchange.getProperty(MessageExchangeImpl.SERVICE_NAME_PROP).toString());
}
if (exchange.getProperty(MessageExchangeImpl.SERVICE_ENDPOINT_PROP) != null) {
ServiceEndpoint se = (ServiceEndpoint) exchange.getProperty(MessageExchangeImpl.SERVICE_ENDPOINT_PROP);
message.setStringProperty(JBI_ENDPOINT, "{" + se.getServiceName().toString() + "}" + se.getEndpointName());
}
// TODO: write exchange properties
}
message.setBooleanProperty(PROPERTY_ROLLBACK_ON_ERRORS, rollbackOnErrors);
boolean expectResponse;
if (!rollbackOnErrors) {
expectResponse = true;
} else {
switch (exchange.getPattern()) {
case InOnly:
expectResponse = false;
break;
case RobustInOnly:
expectResponse = exchange.getRole() == Role.Provider;
break;
case InOut:
expectResponse = exchange.getRole() == Role.Provider;
break;
default:
// TODO:
expectResponse = true;
break;
}
}
if (expectResponse) {
exchanges.put(exchange.getId(), exchange);
message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
message.setStringProperty(PROPERTY_SENDER_CORR_ID, exchange.getId());
if (requestor.getMessage() != null) {
message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
}
requestor.send(message);
} else {
message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
message.setStringProperty(PROPERTY_SENDER_CORR_ID, null);
if (requestor.getMessage() != null) {
message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
}
requestor.send(message);
// TODO: send done in the tx synchronization
done(exchange);
}
} else if (exchange.getStatus() == Status.Done) {
boolean doSend;
if (!rollbackOnErrors) {
doSend = true;
} else {
switch (exchange.getPattern()) {
case InOnly:
// never send done for InOnly
doSend = false;
break;
case RobustInOnly:
// only send done when there is no fault
// which means the exchange has a consumer role
doSend = exchange.getRole() == Role.Consumer;
break;
case InOptionalOut:
// TODO
doSend = true;
break;
case InOut:
// in an InOut mep, the DONE status always come from the JBI consumer
doSend = false;
break;
default:
throw new IllegalStateException("Unsupported MEP: " + exchange.getPattern());
}
}
if (doSend) {
javax.jms.Message message = requestor.getSession().createMessage();
message.setIntProperty(JBI_MESSAGE, JBI_MESSAGE_DONE);
message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
message.setStringProperty(PROPERTY_SENDER_CORR_ID, null);
if (requestor.getMessage() != null) {
message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
}
requestor.send(message);
}
} else if (exchange.getStatus() == Status.Error) {
boolean doSend;
if (!rollbackOnErrors) {
doSend = true;
} else {
switch (exchange.getPattern()) {
case InOnly:
// never send errors for InOnly
doSend = false;
break;
case RobustInOnly:
// do not send exchange from the provider back to the consumer
doSend = pool.getTransacted() == Transacted.None || exchange.getRole() != Role.Consumer;
break;
case InOptionalOut:
// TODO
doSend = true;
break;
case InOut:
doSend = pool.getTransacted() == Transacted.None || exchange.getRole() != Role.Consumer;
break;
default:
throw new IllegalStateException("Unsupported MEP: " + exchange.getPattern());
}
}
if (doSend) {
javax.jms.Message message = requestor.getSession().createObjectMessage(exchange.getError());
message.setIntProperty(JBI_MESSAGE, JBI_MESSAGE_ERROR);
message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
message.setStringProperty(PROPERTY_SENDER_CORR_ID, null);
if (requestor.getMessage() != null) {
message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
}
requestor.send(message);
} else {
requestor.setRollbackOnly();
}
} else {
throw new IllegalStateException("Unknown exchange status: " + exchange);
}
}
}