public void process()

in engines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/AsyncBridge.java [226:316]


    public void process(MessageExchange exchange) throws Exception {
        // Three exchanges are involved: the first InOut will be called t0,
        // the InOnly send will be called t1 and the InOnly received will be called t2

        if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
            // Step1: receive t0 as the first message
            if (exchange instanceof InOut && exchange.getStatus() == ExchangeStatus.ACTIVE) {
                MessageExchange t0 = exchange;
                MessageExchange t1;
                final String correlationId = (String) requestCorrId.evaluate(t0, t0.getMessage("in"));
                if (correlationId == null || correlationId.length() == 0) {
                    throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
                }
                store.store(correlationId + ".t0", t0);
                t1 = useRobustInOnly ? getExchangeFactory().createRobustInOnlyExchange()
                                     : getExchangeFactory().createInOnlyExchange();
                target.configureTarget(t1, getContext());
                MessageUtil.transferInToIn(t0, t1);
                t1.setProperty(responseCorrIdProperty, correlationId);
                t1.getMessage("in").setProperty(responseCorrIdProperty, correlationId);
                send(t1);
            // Receive the done / error from t0
            } else if (exchange instanceof InOut && exchange.getStatus() != ExchangeStatus.ACTIVE) {
                MessageExchange t0 = exchange;
                MessageExchange t1;
                MessageExchange t2;
                final String correlationId = (String) requestCorrId.evaluate(t0, t0.getMessage("in"));
                t1 = (MessageExchange) store.load(correlationId + ".t1");
                t2 = (MessageExchange) store.load(correlationId + ".t2");
                if (t1 != null) {
                    done(t1);
                }
                if (t2 != null) {
                    done(t2);
                }
            // Receive the response from t2
            } else if ((exchange instanceof InOnly || exchange instanceof RobustInOnly) && exchange.getStatus() == ExchangeStatus.ACTIVE) {
                MessageExchange t0;
                MessageExchange t2 = exchange;
                final String correlationId = (String) responseCorrId.evaluate(t2, t2.getMessage("in"));
                if (correlationId == null || correlationId.length() == 0) {
                    throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
                }
                t0 = (MessageExchange) store.load(correlationId + ".t0");
                // The request is found and has not timed out
                if (t0 != null) {
                    store.store(correlationId + ".t2", t2);
                    MessageUtil.transferInToOut(t2, t0);
                    send(t0);
                } else {
                    done(t2);
                }
            } else {
                throw new IllegalStateException();
            }
        // Handle an exchange as a CONSUMER
        } else {
            // Step 2: receive t1 response
            // If this is an error or a fault, transfer it from t1 to t0 and send,
            // else, start a timeout to wait for t2
            MessageExchange t1 = exchange;
            // an error
            final String correlationId = (String) t1.getProperty(responseCorrIdProperty);
            if (t1.getStatus() == ExchangeStatus.ERROR) {
                MessageExchange t0 = (MessageExchange) store.load(correlationId + ".t0");
                // t1 response may come after t0, so in case this happens, we need to discard t1
                if (t0 != null) {
                    fail(t0, t1.getError());
                }
            // a fault ?
            } else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
                MessageExchange t0 = (MessageExchange) store.load(correlationId + ".t0");
                // t1 response may come after t0, so in case this happens, we need to discard t1
                if (t0 != null) {
                    store.store(correlationId + ".t1", t1);
                    MessageUtil.transferFaultToFault(t1, t0);
                    send(t0);
                }
            // request sent successfully, start the timeout
            } else {
                Date exchangeTimeout = getTimeout(t1);
                if (exchangeTimeout != null) {
                    getTimerManager().schedule(new TimerListener() {
                        public void timerExpired(Timer timer) {
                            AsyncBridge.this.onTimeout(correlationId);
                        }
                    }, exchangeTimeout);
                }
            }
        }
    }