private void processConsumerAsync()

in engines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/StaticRoutingSlip.java [179:255]


    private void processConsumerAsync(MessageExchange exchange) throws Exception {
        String correlationId = (String) exchange.getProperty(correlation);
        String previousId = (String) exchange.getProperty(previous);
        Integer prevIndex = (Integer) exchange.getProperty(index);
        if (correlationId == null) {
            throw new IllegalStateException(correlation + " property not found");
        }
        if (prevIndex == null) {
            throw new IllegalStateException(previous + " property not found");
        }
        // This should never happen, as we can only send DONE
        if (exchange.getStatus() == ExchangeStatus.DONE) {
            throw new IllegalStateException("Exchange status is " + ExchangeStatus.DONE);
        // ERROR are sent back to the consumer
        } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
            MessageExchange me = (MessageExchange) store.load(correlationId);
            fail(me, exchange.getError());
            // Ack the previous target
            if (previousId != null) {
                me = (MessageExchange) store.load(previousId);
                done(me);
            }
        // Faults are sent back to the consumer
        } else if (exchange.getFault() != null) {
            MessageExchange me = (MessageExchange) store.load(correlationId);
            me.setProperty(correlation, exchange.getExchangeId());
            store.store(exchange.getExchangeId(), exchange);
            MessageUtil.transferFaultToFault(exchange, me);
            send(me);
            // Ack the previous target
            if (previousId != null) {
                me = (MessageExchange) store.load(previousId);
                done(me);
            }
        // Out message, give it to next target or back to consumer
        } else if (exchange.getMessage("out") != null) {
            // This is the answer from the last target
            if (prevIndex.intValue() == targets.length - 1) {
                MessageExchange me = (MessageExchange) store.load(correlationId);
                me.setProperty(correlation, exchange.getExchangeId());
                store.store(exchange.getExchangeId(), exchange);
                MessageUtil.transferOutToOut(exchange, me);
                send(me);
                if (previousId != null) {
                    me = (MessageExchange) store.load(previousId);
                    done(me);
                }
            // We still have a target to hit
            } else {
                MessageExchange me = getExchangeFactory().createInOutExchange();
                Integer curIndex = new Integer(prevIndex.intValue() + 1);
                me.setProperty(correlation, correlationId);
                me.setProperty(index, curIndex);
                me.setProperty(previous, exchange.getExchangeId());
                targets[curIndex.intValue()].configureTarget(me, getContext());
                store.store(exchange.getExchangeId(), exchange);
                MessageUtil.transferOutToIn(exchange, me);
                try {
                    send(me);
                } catch (RuntimeException re) {
                    // send delivery channel errors back to calling endpoint
                    if (correlationId != null) {
                        me = (MessageExchange) store.load(correlationId);                      
                    } 
                    fail(me, re);
                } 
                if (previousId != null) {
                    me = (MessageExchange) store.load(previousId);
                    done(me);
                }
            }
        // This should not happen
        } else {
            throw new IllegalStateException("Exchange status is " + ExchangeStatus.ACTIVE
                    + " but has no Out nor Fault message");
        }
    }