in engines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/AsyncBridge.java [226:316]
public void process(MessageExchange exchange) throws Exception {
// Three exchanges are involved: the first InOut will be called t0,
// the InOnly send will be called t1 and the InOnly received will be called t2
if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
// Step1: receive t0 as the first message
if (exchange instanceof InOut && exchange.getStatus() == ExchangeStatus.ACTIVE) {
MessageExchange t0 = exchange;
MessageExchange t1;
final String correlationId = (String) requestCorrId.evaluate(t0, t0.getMessage("in"));
if (correlationId == null || correlationId.length() == 0) {
throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
}
store.store(correlationId + ".t0", t0);
t1 = useRobustInOnly ? getExchangeFactory().createRobustInOnlyExchange()
: getExchangeFactory().createInOnlyExchange();
target.configureTarget(t1, getContext());
MessageUtil.transferInToIn(t0, t1);
t1.setProperty(responseCorrIdProperty, correlationId);
t1.getMessage("in").setProperty(responseCorrIdProperty, correlationId);
send(t1);
// Receive the done / error from t0
} else if (exchange instanceof InOut && exchange.getStatus() != ExchangeStatus.ACTIVE) {
MessageExchange t0 = exchange;
MessageExchange t1;
MessageExchange t2;
final String correlationId = (String) requestCorrId.evaluate(t0, t0.getMessage("in"));
t1 = (MessageExchange) store.load(correlationId + ".t1");
t2 = (MessageExchange) store.load(correlationId + ".t2");
if (t1 != null) {
done(t1);
}
if (t2 != null) {
done(t2);
}
// Receive the response from t2
} else if ((exchange instanceof InOnly || exchange instanceof RobustInOnly) && exchange.getStatus() == ExchangeStatus.ACTIVE) {
MessageExchange t0;
MessageExchange t2 = exchange;
final String correlationId = (String) responseCorrId.evaluate(t2, t2.getMessage("in"));
if (correlationId == null || correlationId.length() == 0) {
throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
}
t0 = (MessageExchange) store.load(correlationId + ".t0");
// The request is found and has not timed out
if (t0 != null) {
store.store(correlationId + ".t2", t2);
MessageUtil.transferInToOut(t2, t0);
send(t0);
} else {
done(t2);
}
} else {
throw new IllegalStateException();
}
// Handle an exchange as a CONSUMER
} else {
// Step 2: receive t1 response
// If this is an error or a fault, transfer it from t1 to t0 and send,
// else, start a timeout to wait for t2
MessageExchange t1 = exchange;
// an error
final String correlationId = (String) t1.getProperty(responseCorrIdProperty);
if (t1.getStatus() == ExchangeStatus.ERROR) {
MessageExchange t0 = (MessageExchange) store.load(correlationId + ".t0");
// t1 response may come after t0, so in case this happens, we need to discard t1
if (t0 != null) {
fail(t0, t1.getError());
}
// a fault ?
} else if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
MessageExchange t0 = (MessageExchange) store.load(correlationId + ".t0");
// t1 response may come after t0, so in case this happens, we need to discard t1
if (t0 != null) {
store.store(correlationId + ".t1", t1);
MessageUtil.transferFaultToFault(t1, t0);
send(t0);
}
// request sent successfully, start the timeout
} else {
Date exchangeTimeout = getTimeout(t1);
if (exchangeTimeout != null) {
getTimerManager().schedule(new TimerListener() {
public void timerExpired(Timer timer) {
AsyncBridge.this.onTimeout(correlationId);
}
}, exchangeTimeout);
}
}
}
}