in engines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java [304:376]
private void processProvider(MessageExchange exchange) throws Exception {
final String processCorrelationId = (String) exchange.getProperty(JbiConstants.CORRELATION_ID);
NormalizedMessage in = MessageUtil.copyIn(exchange);
final String correlationId = getCorrelationID(exchange, in);
if (correlationId == null || correlationId.length() == 0) {
throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
}
// Load existing aggregation
Lock lock = getLockManager().getLock(correlationId);
lock.lock();
boolean removeLock = true;
try {
Object aggregation = store.load(correlationId);
Date timeout = null;
// Create a new aggregate
if (aggregation == null) {
if (isAggregationClosed(correlationId)) {
// TODO: should we return an error here ?
} else {
aggregation = createAggregation(correlationId);
timeout = getTimeout(aggregation);
}
} else if (isRescheduleTimeouts()) {
timeout = getTimeout(aggregation);
}
// If the aggregation is not closed
if (aggregation != null) {
if (reportErrors) {
List<MessageExchange> exchanges = (List<MessageExchange>) store.load(correlationId + "-exchanges");
if (exchanges == null) {
exchanges = new ArrayList<MessageExchange>();
}
exchanges.add(exchange);
store.store(correlationId + "-exchanges", exchanges);
removeLock = false;
}
if (addMessage(aggregation, in, exchange)) {
sendAggregate(processCorrelationId, correlationId, aggregation, false, isSynchronous(exchange));
} else {
store.store(correlationId, aggregation);
if (timeout != null) {
logger.debug("Scheduling timeout at {} for aggregate {}", timeout, correlationId);
Timer t = getTimerManager().schedule(new TimerListener() {
public void timerExpired(Timer timer) {
AbstractAggregator.this.onTimeout(processCorrelationId, correlationId, timer);
}
}, timeout);
timers.put(correlationId, t);
}
removeLock = false;
}
if (!reportErrors) {
done(exchange);
}
} else {
if (reportClosedAggregatesAsErrors) {
fail(exchange, new ClosedAggregateException());
} else {
done(exchange);
}
}
} finally {
try {
lock.unlock();
} catch (Exception ex) {
logger.info("Caught exception while attempting to release aggregation lock", ex);
}
if (removeLock) {
lockManager.removeLock(correlationId);
}
}
}