protected void processAsync()

in engines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRecipientList.java [157:239]


    protected void processAsync(MessageExchange exchange) throws Exception {
        if (exchange.getRole() == MessageExchange.Role.CONSUMER) {
            String corrId = (String) exchange.getMessage("in").getProperty(RECIPIENT_LIST_CORRID);
            int count = (Integer) exchange.getMessage("in").getProperty(RECIPIENT_LIST_COUNT);
            Integer acks = null;
            Lock lock = lockManager.getLock(corrId);
            lock.lock();
            boolean removeLock = true;
            try {
                acks = (Integer) store.load(corrId + ".acks");
                if (exchange.getStatus() == ExchangeStatus.DONE) {
                    // If the acks integer is not here anymore, the message response has been sent already
                    if (acks != null) {
                        if (acks + 1 >= count) {
                            MessageExchange me = (MessageExchange) store.load(corrId);
                            done(me);
                        } else {
                            store.store(corrId + ".acks", Integer.valueOf(acks + 1));
                            removeLock = false;
                        }
                    }
                } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
                    // If the acks integer is not here anymore, the message response has been sent already
                    if (acks != null) {
                        if (reportErrors) {
                            MessageExchange me = (MessageExchange) store.load(corrId);
                            fail(me, exchange.getError());
                        } else  if (acks + 1 >= count) {
                            MessageExchange me = (MessageExchange) store.load(corrId);
                            done(me);
                        } else {
                            store.store(corrId + ".acks", Integer.valueOf(acks + 1));
                            removeLock = false;
                        }
                    }
                } else if (exchange.getFault() != null) {
                    // If the acks integer is not here anymore, the message response has been sent already
                    if (acks != null) {
                        if (reportErrors) {
                            MessageExchange me = (MessageExchange) store.load(corrId);
                            MessageUtil.transferToFault(MessageUtil.copyFault(exchange), me);
                            send(me);
                            done(exchange);
                        } else  if (acks + 1 >= count) {
                            MessageExchange me = (MessageExchange) store.load(corrId);
                            done(me);
                        } else {
                            store.store(corrId + ".acks", Integer.valueOf(acks + 1));
                            removeLock = false;
                        }
                    } else {
                        done(exchange);
                    }
                }
            } finally {
                try {
                    lock.unlock();
                } catch (Exception ex) {
                    logger.info("Caught exception while attempting to release lock", ex);
                }
                if (removeLock) {
                    lockManager.removeLock(corrId);
                }
            }
        } else {
            if (!(exchange instanceof InOnly) && !(exchange instanceof RobustInOnly)) {
                fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
            } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
                store.store(exchange.getExchangeId(), exchange);
                store.store(exchange.getExchangeId() + ".acks", Integer.valueOf(0));
                NormalizedMessage in = MessageUtil.copyIn(exchange);
                for (int i = 0; i < recipients.length; i++) {
                    MessageExchange me = getExchangeFactory().createExchange(exchange.getPattern());
                    recipients[i].configureTarget(me, getContext());
                    in.setProperty(RECIPIENT_LIST_COUNT, new Integer(recipients.length));
                    in.setProperty(RECIPIENT_LIST_INDEX, new Integer(i));
                    in.setProperty(RECIPIENT_LIST_CORRID, exchange.getExchangeId());
                    MessageUtil.transferToIn(in, me);
                    send(me);
                }
            }
        }
    }