private void processProvider()

in engines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractAggregator.java [304:376]


    private void processProvider(MessageExchange exchange) throws Exception {
        final String processCorrelationId = (String) exchange.getProperty(JbiConstants.CORRELATION_ID);

        NormalizedMessage in = MessageUtil.copyIn(exchange);
        final String correlationId = getCorrelationID(exchange, in);
        if (correlationId == null || correlationId.length() == 0) {
            throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
        }
        // Load existing aggregation
        Lock lock = getLockManager().getLock(correlationId);
        lock.lock();
        boolean removeLock = true;
        try {
            Object aggregation = store.load(correlationId);
            Date timeout = null;
            // Create a new aggregate
            if (aggregation == null) {
                if (isAggregationClosed(correlationId)) {
                    // TODO: should we return an error here ?
                } else {
                    aggregation = createAggregation(correlationId);
                    timeout = getTimeout(aggregation);
                }
            } else if (isRescheduleTimeouts()) {
                timeout = getTimeout(aggregation);
            }
            // If the aggregation is not closed
            if (aggregation != null) {
                if (reportErrors) {
                    List<MessageExchange> exchanges = (List<MessageExchange>) store.load(correlationId + "-exchanges");
                    if (exchanges == null) {
                        exchanges = new ArrayList<MessageExchange>();
                    }
                    exchanges.add(exchange);
                    store.store(correlationId + "-exchanges", exchanges);
                    removeLock = false;
                }
                if (addMessage(aggregation, in, exchange)) {
                    sendAggregate(processCorrelationId, correlationId, aggregation, false, isSynchronous(exchange));
                } else {
                    store.store(correlationId, aggregation);
                    if (timeout != null) {
                        logger.debug("Scheduling timeout at {} for aggregate {}", timeout, correlationId);
                        Timer t = getTimerManager().schedule(new TimerListener() {
                            public void timerExpired(Timer timer) {
                                AbstractAggregator.this.onTimeout(processCorrelationId, correlationId, timer);
                            }
                        }, timeout);
                        timers.put(correlationId, t);
                    }
                    removeLock = false;
                }
                if (!reportErrors) {
                    done(exchange);
                }
            } else {
                if (reportClosedAggregatesAsErrors) {
                    fail(exchange, new ClosedAggregateException());
                } else {
                    done(exchange);
                }
            }
        } finally {
            try {
                lock.unlock();
            } catch (Exception ex) {
                logger.info("Caught exception while attempting to release aggregation lock", ex);
            }
            if (removeLock) {
                lockManager.removeLock(correlationId);
            }
        }
    }