in core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java [444:503]
public boolean sendSync(MessageExchange messageExchange, long timeout) throws MessagingException {
// If the delivery channel has been closed
checkNotClosed();
// Log call
LOGGER.debug("SendSync {} in {}", messageExchange.getExchangeId(), this);
boolean result = false;
// JBI 5.5.2.1.3: set the sendSync property
messageExchange.setProperty(JbiConstants.SEND_SYNC, Boolean.TRUE);
// Call doSend
MessageExchangeImpl me = (MessageExchangeImpl) messageExchange;
String exchangeKey = me.getKey();
try {
exchangesById.put(exchangeKey, me);
// Synchronously send a message and wait for the response
synchronized (me) {
doSend(me, true);
if (me.getSyncState() != MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED) {
waitForExchange(me, me, timeout, "sendSync");
} else {
LOGGER.debug("Exchange {} has already been answered (no need to wait)", messageExchange.getExchangeId());
}
}
if (me.getSyncState() == MessageExchangeImpl.SYNC_STATE_SYNC_RECEIVED) {
me.handleAccept();
// If the sender flag has been removed, it means
// the message has been delivered in the same thread
// so there is no need to resume the transaction
// See processInBound
// if (messageExchangeImpl.getSyncSenderThread() != null) {
resumeTx(me);
// }
// Call input listeners
ExchangeListener[] l = (ExchangeListener[]) container.getListeners(ExchangeListener.class);
ExchangeEvent event = new ExchangeEvent(me, ExchangeEvent.EXCHANGE_ACCEPTED);
for (int i = 0; i < l.length; i++) {
try {
l[i].exchangeAccepted(event);
} catch (Exception e) {
LOGGER.warn("Error calling listener: {}", e.getMessage(), e);
}
}
result = true;
} else {
// JBI 5.5.2.1.3: the exchange should be set to ERROR status
LOGGER.debug("Exchange {} has been aborted", messageExchange.getExchangeId());
me.getPacket().setAborted(true);
me.getPacket().setError(new RuntimeException("sendSync timeout for "
+ messageExchange.getExchangeId()));
result = false;
}
} catch (InterruptedException e) {
throw new MessagingException(e);
} catch (RuntimeException e) {
// e.printStackTrace();
throw e;
} finally {
exchangesById.remove(exchangeKey);
}
return result;
}