in core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java [343:405]
protected void doSend(MessageExchangeImpl me, boolean sync) throws MessagingException {
MessageExchangeImpl mirror = me.getMirror();
boolean finished = me.getStatus() != ExchangeStatus.ACTIVE;
try {
LOGGER.trace("Sent: {}", me);
// If the message has timed out
if (me.getPacket().isAborted()) {
throw new ExchangeTimeoutException(me);
}
// Auto enlist exchange in transaction
autoEnlistInTx(me);
// Update persistence info
autoSetPersistent(me);
if (me.getRole().equals(Role.CONSUMER)
&& me.getStatus().equals(ExchangeStatus.ACTIVE)) {
// Throttle if needed
// the throttle should happen when send messageexchange from
//consumer to provider, so avoid throttling for response me and
//Done me
throttle();
}
// Store the consumer component
if (me.getRole() == Role.CONSUMER) {
me.setSourceId(component.getComponentNameSpace());
}
// Call the listeners before the ownership changes
// Call input listeners
ExchangeListener[] l = (ExchangeListener[]) container.getListeners(ExchangeListener.class);
ExchangeEvent event = new ExchangeEvent(me, ExchangeEvent.EXCHANGE_SENT);
for (int i = 0; i < l.length; i++) {
try {
l[i].exchangeSent(event);
} catch (Exception e) {
LOGGER.warn("Error calling listener: {}", e.getMessage(), e);
}
}
// Change ownership
me.handleSend(sync);
mirror.setTxState(MessageExchangeImpl.TX_STATE_NONE);
// If this is the DONE or ERROR status from a synchronous
// transactional exchange,
// it should not be part of the transaction, so remove the tx
// context
if (finished && me.getTxLock() == null && me.getTxState() == MessageExchangeImpl.TX_STATE_CONVEYED
&& !me.isPushDelivery() && me.getRole() == Role.CONSUMER) {
me.setTransactionContext(null);
}
container.sendExchange(mirror);
} catch (MessagingException e) {
LOGGER.debug("Exception processing: {} in {}", me.getExchangeId(), this);
throw e;
} finally {
// If there is a tx lock, we need to suspend and notify
if (me.getTxLock() != null) {
if (mirror.getTxState() == MessageExchangeImpl.TX_STATE_ENLISTED) {
suspendTx(mirror);
}
synchronized (me.getTxLock()) {
notifyExchange(me, me.getTxLock(), "doSendWithTxLock");
}
}
}
}