in engines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java [157:239]
protected void processAsync(MessageExchange exchange) throws Exception {
if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
String corrId = (String) exchange.getMessage("in").getProperty(RECIPIENT_LIST_CORRID);
int count = (Integer) exchange.getMessage("in").getProperty(RECIPIENT_LIST_COUNT);
Integer acks = null;
Lock lock = lockManager.getLock(corrId);
lock.lock();
boolean removeLock = true;
try {
acks = (Integer) store.load(corrId + ".acks");
if (exchange.getStatus() == ExchangeStatus.DONE) {
// If the acks integer is not here anymore, the message response has been sent already
if (acks != null) {
if (acks + 1 >= count) {
MessageExchange me = (MessageExchange) store.load(corrId);
done(me);
} else {
store.store(corrId + ".acks", Integer.valueOf(acks + 1));
removeLock = false;
}
}
} else if (exchange.getStatus() == ExchangeStatus.ERROR) {
// If the acks integer is not here anymore, the message response has been sent already
if (acks != null) {
if (reportErrors) {
MessageExchange me = (MessageExchange) store.load(corrId);
fail(me, exchange.getError());
} else if (acks + 1 >= count) {
MessageExchange me = (MessageExchange) store.load(corrId);
done(me);
} else {
store.store(corrId + ".acks", Integer.valueOf(acks + 1));
removeLock = false;
}
}
} else if (exchange.getFault() != null) {
// If the acks integer is not here anymore, the message response has been sent already
if (acks != null) {
if (reportErrors) {
MessageExchange me = (MessageExchange) store.load(corrId);
MessageUtil.transferToFault(MessageUtil.copyFault(exchange), me);
send(me);
done(exchange);
} else if (acks + 1 >= count) {
MessageExchange me = (MessageExchange) store.load(corrId);
done(me);
} else {
store.store(corrId + ".acks", Integer.valueOf(acks + 1));
removeLock = false;
}
} else {
done(exchange);
}
}
} finally {
try {
lock.unlock();
} catch (Exception ex) {
logger.info("Caught exception while attempting to release lock", ex);
}
if (removeLock) {
lockManager.removeLock(corrId);
}
}
} else {
if (!(exchange instanceof InOnly) && !(exchange instanceof RobustInOnly)) {
fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
} else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
store.store(exchange.getExchangeId(), exchange);
store.store(exchange.getExchangeId() + ".acks", Integer.valueOf(0));
NormalizedMessage in = MessageUtil.copyIn(exchange);
for (int i = 0; i < recipients.length; i++) {
MessageExchange me = getExchangeFactory().createExchange(exchange.getPattern());
recipients[i].configureTarget(me, getContext());
in.setProperty(RECIPIENT_LIST_COUNT, new Integer(recipients.length));
in.setProperty(RECIPIENT_LIST_INDEX, new Integer(i));
in.setProperty(RECIPIENT_LIST_CORRID, exchange.getExchangeId());
MessageUtil.transferToIn(in, me);
send(me);
}
}
}
}