in core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java [265:312]
public MessageExchange accept(long timeoutMS) throws MessagingException {
try {
checkNotClosed();
MessageExchangeImpl me = queue.poll(timeoutMS, TimeUnit.MILLISECONDS);
if (me != null) {
// If the exchange has already timed out,
// do not give it to the component
if (me.getPacket().isAborted()) {
LOGGER.debug("Aborted {} in {}", me.getExchangeId(), this);
me = null;
} else {
LOGGER.debug("Accepting {} in {}", me.getExchangeId(), this);
// If we have a tx lock and the exchange is not active, we
// need
// to notify here without resuming transaction
if (me.getTxLock() != null && me.getStatus() != ExchangeStatus.ACTIVE) {
notifyExchange(me.getMirror(), me.getTxLock(), "acceptFinishedExchangeWithTxLock");
me.handleAccept();
LOGGER.trace("Accepted: {}", me);
// We transactionnaly deliver a finished exchange
} else if (me.isTransacted() && me.getStatus() != ExchangeStatus.ACTIVE) {
// Do not resume transaction
me.handleAccept();
LOGGER.trace("Accepted: {}", me);
} else {
resumeTx(me);
me.handleAccept();
LOGGER.trace("Accepted: {}", me);
}
}
}
if (me != null) {
// Call input listeners
ExchangeListener[] l = (ExchangeListener[]) container.getListeners(ExchangeListener.class);
ExchangeEvent event = new ExchangeEvent(me, ExchangeEvent.EXCHANGE_ACCEPTED);
for (int i = 0; i < l.length; i++) {
try {
l[i].exchangeAccepted(event);
} catch (Exception e) {
LOGGER.warn("Error calling listener: {}", e.getMessage(), e);
}
}
}
return me;
} catch (InterruptedException e) {
throw new MessagingException("accept failed", e);
}
}