protected void processAsync()

in engines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/AbstractSplitter.java [234:322]


    protected void processAsync(MessageExchange exchange) throws Exception {
        if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
            String corrId = (String) exchange.getMessage("in").getProperty(SPLITTER_CORRID);
            int count = (Integer) exchange.getMessage("in").getProperty(SPLITTER_COUNT);
            Integer acks = null;
            Lock lock = lockManager.getLock(corrId);
            lock.lock();
            boolean removeLock = true;
            try {
                acks = (Integer) store.load(corrId + ".acks");
                if (exchange.getStatus() == ExchangeStatus.DONE) {
                    // If the acks integer is not here anymore, the message response has been sent already
                    if (acks != null) {
                        acks++;
                        if (acks < count) {
                            MessageExchange me = (MessageExchange) store.load(corrId);
                            done(me);
                        } else {
                            store.store(corrId + ".acks", Integer.valueOf(acks));
                            removeLock = false;
                        }
                    }
                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
                    // If the acks integer is not here anymore, the message response has been sent already
                    if (acks != null) {
                        acks++;
                        if (reportErrors) {
                            MessageExchange me = (MessageExchange) store.load(corrId);
                            fail(me, exchange.getError());
                        } else  if (acks < count) {
                            MessageExchange me = (MessageExchange) store.load(corrId);
                            done(me);
                        } else {
                            store.store(corrId + ".acks", Integer.valueOf(acks));
                            removeLock = false;
                        }
                    }
                } else if (exchange.getFault() != null) {
                    // If the acks integer is not here anymore, the message response has been sent already
                    if (acks != null) {
                        acks++;
                        if (reportErrors) {
                            MessageExchange me = (MessageExchange) store.load(corrId);
                            MessageUtil.transferToFault(MessageUtil.copyFault(exchange), me);
                            send(me);
                            done(exchange);
                        } else  if (acks < count) {
                            MessageExchange me = (MessageExchange) store.load(corrId);
                            done(me);
                        } else {
                            store.store(corrId + ".acks", Integer.valueOf(acks));
                            removeLock = false;
                        }
                    } else {
                        done(exchange);
                    }
                }
            } finally {
                try {
                    lock.unlock();
                } catch (Exception ex) {
                    logger.info("Caught exception while attempting to release lock", ex);
                }
                if (removeLock) {
                    lockManager.removeLock(corrId);
                }
            }
        } else {
            if (exchange.getStatus() == ExchangeStatus.DONE) {
                return;
            } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
                return;
            } else if (!(exchange instanceof InOnly) && !(exchange instanceof RobustInOnly)) {
                fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
            } else if (exchange.getFault() != null) {
                done(exchange);
            } else {
                store.store(exchange.getExchangeId(), exchange);
                MessageExchange[] parts = createParts(exchange);
                store.store(exchange.getExchangeId() + ".acks", Integer.valueOf(0));
                for (int i = 0; i < parts.length; i++) {
                    target.configureTarget(parts[i], getContext());
                    send(parts[i]);
                }
                // do not done the exchange on provider side!
                //done(exchange);
            }
        }
    }