in engines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRoutingSlip.java [179:255]
private void processConsumerAsync(MessageExchange exchange) throws Exception {
String correlationId = (String) exchange.getProperty(correlation);
String previousId = (String) exchange.getProperty(previous);
Integer prevIndex = (Integer) exchange.getProperty(index);
if (correlationId == null) {
throw new IllegalStateException(correlation + " property not found");
}
if (prevIndex == null) {
throw new IllegalStateException(previous + " property not found");
}
// This should never happen, as we can only send DONE
if (exchange.getStatus() == ExchangeStatus.DONE) {
throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE);
// ERROR are sent back to the consumer
} else if (exchange.getStatus() == ExchangeStatus.ERROR) {
MessageExchange me = (MessageExchange) store.load(correlationId);
fail(me, exchange.getError());
// Ack the previous target
if (previousId != null) {
me = (MessageExchange) store.load(previousId);
done(me);
}
// Faults are sent back to the consumer
} else if (exchange.getFault() != null) {
MessageExchange me = (MessageExchange) store.load(correlationId);
me.setProperty(correlation, exchange.getExchangeId());
store.store(exchange.getExchangeId(), exchange);
MessageUtil.transferFaultToFault(exchange, me);
send(me);
// Ack the previous target
if (previousId != null) {
me = (MessageExchange) store.load(previousId);
done(me);
}
// Out message, give it to next target or back to consumer
} else if (exchange.getMessage("out") != null) {
// This is the answer from the last target
if (prevIndex.intValue() == targets.length - 1) {
MessageExchange me = (MessageExchange) store.load(correlationId);
me.setProperty(correlation, exchange.getExchangeId());
store.store(exchange.getExchangeId(), exchange);
MessageUtil.transferOutToOut(exchange, me);
send(me);
if (previousId != null) {
me = (MessageExchange) store.load(previousId);
done(me);
}
// We still have a target to hit
} else {
MessageExchange me = getExchangeFactory().createInOutExchange();
Integer curIndex = new Integer(prevIndex.intValue() + 1);
me.setProperty(correlation, correlationId);
me.setProperty(index, curIndex);
me.setProperty(previous, exchange.getExchangeId());
targets[curIndex.intValue()].configureTarget(me, getContext());
store.store(exchange.getExchangeId(), exchange);
MessageUtil.transferOutToIn(exchange, me);
try {
send(me);
} catch (RuntimeException re) {
// send delivery channel errors back to calling endpoint
if (correlationId != null) {
me = (MessageExchange) store.load(correlationId);
}
fail(me, re);
}
if (previousId != null) {
me = (MessageExchange) store.load(previousId);
done(me);
}
}
// This should not happen
} else {
throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
+ " but has no Out nor Fault message");
}
}