in core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java [551:662]
public void processInBound(MessageExchangeImpl me) throws MessagingException {
LOGGER.trace("Processing inbound exchange: {}", me);
// Check if the delivery channel has been closed
checkNotClosed();
// Retrieve the original exchange sent
MessageExchangeImpl original = exchangesById.get(me.getKey());
if (original != null && me != original) {
original.copyFrom(me);
me = original;
}
// Check if the incoming exchange is a response to a synchronous
// exchange previously sent
// In this case, we do not have to queue it, but rather notify the
// waiting thread.
if (me.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
// If the mirror has been delivered using push, better wait until
// the push call return. This can only work if not using clustered
// flows,
// but the flag is transient so we do not care.
// Ensure that data is uptodate with the incoming exchange (in
// case the exchange has
// been serialized / deserialized by a clustered flow)
suspendTx(original);
me.setSyncState(MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED);
notifyExchange(original, original, "processInboundSynchronousExchange");
return;
}
// If the component implements the MessageExchangeListener,
// the delivery can be made synchronously, so we don't need
// to bother with transactions
MessageExchangeListener listener = getExchangeListener();
if (listener != null && this.container.isOptimizedDelivery()) {
me.handleAccept();
LOGGER.trace("Received: {}", me);
// Call input listeners
ExchangeListener[] l = (ExchangeListener[]) container.getListeners(ExchangeListener.class);
ExchangeEvent event = new ExchangeEvent(me, ExchangeEvent.EXCHANGE_ACCEPTED);
for (int i = 0; i < l.length; i++) {
try {
l[i].exchangeAccepted(event);
} catch (Exception e) {
LOGGER.warn("Error calling listener: {}", e.getMessage(), e);
}
}
// Set the flag the the exchange was delivered using push mode
// This is important for transaction boundaries
me.setPushDeliver(true);
// Deliver the exchange
ClassLoader old = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(component.getComponent().getClass().getClassLoader());
listener.onMessageExchange(me);
} finally {
Thread.currentThread().setContextClassLoader(old);
}
// TODO: handle delayed exchange notifications
return;
}
// Component uses pull delivery.
// If the exchange is transacted, special care should be taken.
// But if the exchange is no more ACTIVE, just queue it, as
// we will never have an answer back.
if (me.isTransacted() && me.getStatus() == ExchangeStatus.ACTIVE) {
// If the transaction is conveyed by the exchange
// We do not need to resume the transaction in this thread
if (me.getTxState() == MessageExchangeImpl.TX_STATE_CONVEYED) {
try {
suspendTx(me);
queue.put(me);
} catch (InterruptedException e) {
LOGGER.debug("Exchange {} aborted due to thread interruption", me.getExchangeId(), e);
me.getPacket().setAborted(true);
}
// Else the delivery / send are enlisted in the current tx.
// We must suspend the transaction, queue it, and wait for the
// answer
// to be sent, at which time the tx should be suspended and resumed
// in
// this thread.
} else {
Object lock = new Object();
synchronized (lock) {
try {
me.setTxLock(lock);
suspendTx(me);
queue.put(me);
waitForExchange(me, lock, 0, "processInboundTransactionalExchange");
} catch (InterruptedException e) {
LOGGER.debug("Exchange {} aborted due to thread interruption", me.getExchangeId(), e);
me.getPacket().setAborted(true);
} finally {
me.setTxLock(null);
resumeTx(me);
}
}
}
// If the exchange is ACTIVE, the transaction boundary will suspended
// when the
// answer is sent
// Else just queue the exchange
} else {
try {
queue.put(me);
} catch (InterruptedException e) {
LOGGER.debug("Exchange {} aborted due to thread interruption", me.getExchangeId(), e);
me.getPacket().setAborted(true);
}
}
}