in modules/core/src/main/java/org/apache/sandesha2/util/WSRMMessageSender.java [124:214]
protected void sendOutgoingMessage(RMMsgContext rmMsgCtx, int msgType, long delay, Transaction transaction) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: WSRMParentProcessor::sendOutgoingMessage " + msgType + ", " + delay);
rmMsgCtx.setFlow(MessageContext.OUT_FLOW);
getMsgContext().setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
rmMsgCtx.setTo(new EndpointReference(toAddress));
String transportTo = rmsBean.getTransportTo();
if (transportTo != null) {
rmMsgCtx.setProperty(Constants.Configuration.TRANSPORT_URL, transportTo);
}
//setting msg context properties
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.SEQUENCE_ID, outSequenceID);
rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceID);
rmMsgCtx.addSOAPEnvelope();
// Ensure the outbound message us secured using the correct token
RMMsgCreator.secureOutboundMessage(getRMSBean(), msgContext);
String key = SandeshaUtil.getUUID();
SenderBean senderBean = new SenderBean();
senderBean.setMessageType(msgType);
senderBean.setMessageContextRefKey(key);
senderBean.setTimeToSend(System.currentTimeMillis() + delay);
senderBean.setMessageID(msgContext.getMessageID());
// Set the internal sequence id and outgoing sequence id for the terminate message
senderBean.setInternalSequenceID(internalSequenceID);
if (sequenceExists)
{
senderBean.setSend(true);
senderBean.setSequenceID(outSequenceID);
}
else
senderBean.setSend(false);
EndpointReference to = msgContext.getTo();
if (to!=null)
senderBean.setToAddress(to.getAddress());
// If this message is targetted at an anonymous address then we must not have a transport
// ready for it, as the current message is not a reply.
if(to == null || to.hasAnonymousAddress())
senderBean.setTransportAvailable(false);
msgContext.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
senderBean.setReSend(false);
SenderBeanMgr retramsmitterMgr = storageManager.getSenderBeanMgr();
SandeshaUtil.executeAndStore(rmMsgCtx, key, storageManager);
retramsmitterMgr.insert(senderBean);
if (sequenceExists && !storageManager.hasUserTransaction(msgContext)) {
String workId = msgContext.getMessageID()
+ senderBean.getTimeToSend();
SandeshaThread sender = storageManager.getSender();
WorkerLock lock = sender.getWorkerLock();
SenderWorker worker = new SenderWorker(configurationContext,
senderBean, rmsBean.getRMVersion());
worker.setLock(lock);
worker.setWorkId(workId);
// Actually take the lock
lock.addWork(workId, worker);
// Commit the transaction, so that the sender worker starts with a clean state
if (transaction != null && transaction.isActive())
transaction.commit();
if (worker != null) {
try {
worker.run();
} catch (Exception e) {
log.error("Caught exception running SandeshaWorker", e);
}
}
}
if (log.isDebugEnabled())
log.debug("Exit: WSRMParentProcessor::sendOutgoingMessage");
}