in modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java [591:725]
private RMSBean addCreateSequenceMessage(RMMsgContext applicationRMMsg, RMSBean rmsBean,
StorageManager storageManager) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: ApplicationMsgProcessor::addCreateSequenceMessage, " + rmsBean);
MessageContext applicationMsg = applicationRMMsg.getMessageContext();
ConfigurationContext configCtx = applicationMsg.getConfigurationContext();
// generating a new create sequence message.
RMMsgContext createSeqRMMessage = RMMsgCreator.createCreateSeqMsg(rmsBean, applicationRMMsg);
createSeqRMMessage.setFlow(MessageContext.OUT_FLOW);
CreateSequence createSequencePart = createSeqRMMessage.getCreateSequence();
SequenceOffer offer = createSequencePart.getSequenceOffer();
if (offer != null) {
String offeredSequenceId = offer.getIdentifer().getIdentifier();
rmsBean.setOfferedSequence(offeredSequenceId);
}
MessageContext createSeqMsg = createSeqRMMessage.getMessageContext();
createSeqMsg.setRelationships(null); // create seq msg does not
// relateTo anything
String createSequenceMessageStoreKey = SandeshaUtil.getUUID(); // the key that will be used to store
//the create sequence message.
rmsBean.setCreateSeqMsgID(createSeqMsg.getMessageID());
rmsBean.setCreateSequenceMsgStoreKey(createSequenceMessageStoreKey);
if (storageManager.getRMSBeanMgr().insert(rmsBean)) {
//cloning the message and storing it as a reference.
MessageContext clonedMessage = SandeshaUtil.cloneMessageContext(createSeqMsg);
String clonedMsgStoreKey = SandeshaUtil.getUUID();
storageManager.storeMessageContext(clonedMsgStoreKey, clonedMessage);
rmsBean.setReferenceMessageStoreKey(clonedMsgStoreKey);
SecurityToken token = (SecurityToken) createSeqRMMessage.getProperty(Sandesha2Constants.MessageContextProperties.SECURITY_TOKEN);
if(token != null) {
SecurityManager secManager = SandeshaUtil.getSecurityManager(configCtx);
rmsBean.setSecurityTokenData(secManager.getTokenRecoveryData(token));
}
// Update the RMSBean
storageManager.getRMSBeanMgr().update(rmsBean);
SenderBean createSeqEntry = new SenderBean();
createSeqEntry.setMessageContextRefKey(createSequenceMessageStoreKey);
createSeqEntry.setTimeToSend(System.currentTimeMillis());
createSeqEntry.setMessageID(createSeqRMMessage.getMessageId());
createSeqEntry.setInternalSequenceID(rmsBean.getInternalSequenceID());
// this will be set to true in the sender
createSeqEntry.setSend(true);
// Indicate that this message is a create sequence
createSeqEntry.setMessageType(Sandesha2Constants.MessageTypes.CREATE_SEQ);
EndpointReference to = createSeqRMMessage.getTo();
if (to!=null)
createSeqEntry.setToAddress(to.getAddress());
// If this message is targetted at an anonymous address then we must not have a transport
// ready for it, as the create sequence is not a reply.
if(to == null || to.hasAnonymousAddress())
createSeqEntry.setTransportAvailable(false);
createSeqMsg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
SandeshaUtil.executeAndStore(createSeqRMMessage, createSequenceMessageStoreKey, storageManager);
storageManager.getSenderBeanMgr().insert(createSeqEntry);
if(appMsgProcTran != null && createSeqRMMessage.getMessageId() != null && !storageManager.hasUserTransaction(createSeqMsg)) {
// Lock the sender bean before we insert it, if we are planning to send it ourselves
String workId = createSeqEntry.getMessageID() + createSeqEntry.getTimeToSend();
SandeshaThread sender = storageManager.getSender();
ConfigurationContext context = createSeqMsg.getConfigurationContext();
WorkerLock lock = sender.getWorkerLock();
SenderWorker worker = new SenderWorker(context, createSeqEntry, rmsBean.getRMVersion());
worker.setLock(lock);
worker.setWorkId(workId);
// Actually take the lock
lock.addWork(workId, worker);
// Commit the transaction, so that the sender worker starts with a clean slate.
if(appMsgProcTran.isActive()) appMsgProcTran.commit();
if(worker != null) {
try {
worker.run();
} catch(Exception e) {
log.debug("Caught exception running SandeshaWorker", e);
}
}
//Create transaction
appMsgProcTran = storageManager.getTransaction();
//Find RMSBean
RMSBeanMgr rmsBeanMgr = storageManager.getRMSBeanMgr();
RMSBean tempRMSBean = new RMSBean();
tempRMSBean.setInternalSequenceID(rmsBean.getInternalSequenceID());
rmsBean = rmsBeanMgr.findUnique(tempRMSBean);
// If the RMSBean has been terminated this means that we may
// well have encountered a problem sending this message
if (rmsBean == null || rmsBean.isTerminated()){
if (log.isDebugEnabled())
log.debug("Exit: ApplicationMsgProcessor::addCreateSequenceMessage, Failed to establish sequence " + rmsBean);
if (rmsBean != null && rmsBean.getLastSendError() != null) {
if (rmsBean.getLastSendError() instanceof AxisFault)
throw (AxisFault)rmsBean.getLastSendError();
throw new AxisFault(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSequenceRefused),
rmsBean.getLastSendError());
}
throw new AxisFault(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.createSequenceRefused));
}
}
// Setup enough of the workers to get this create sequence off the box.
SandeshaUtil.startWorkersForSequence(configCtx, rmsBean);
} else {
rmsBean = null;
}
if (log.isDebugEnabled())
log.debug("Exit: ApplicationMsgProcessor::addCreateSequenceMessage, " + rmsBean);
return rmsBean;
}