in modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java [68:232]
public boolean processInMessage(RMMsgContext terminateSeqRMMsg, Transaction transaction) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: TerminateSeqMsgProcessor::processInMessage");
MessageContext terminateSeqMsg = terminateSeqRMMsg.getMessageContext();
// Processing the terminate message
// TODO Add terminate sequence message logic.
TerminateSequence terminateSequence = terminateSeqRMMsg.getTerminateSequence();
if (terminateSequence == null) {
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noTerminateSeqPart);
log.debug(message);
throw new SandeshaException(message);
}
String sequenceId = terminateSequence.getIdentifier().getIdentifier();
if (sequenceId == null || "".equals(sequenceId)) {
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidSequenceID, null);
log.debug(message);
throw new SandeshaException(message);
}
ConfigurationContext context = terminateSeqMsg.getConfigurationContext();
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
// Check that the sender of this TerminateSequence holds the correct token
RMSequenceBean rmBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
if(rmBean==null){
rmBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceId);
}
//check security credentials
SandeshaUtil.assertProofOfPossession(rmBean, terminateSeqMsg,
terminateSeqMsg.getEnvelope().getBody());
if (FaultManager.checkForUnknownSequence(terminateSeqRMMsg, sequenceId, storageManager, false)) {
if (log.isDebugEnabled())
log.debug("Exit: TerminateSeqMsgProcessor::processInMessage, unknown sequence");
return false;
}
// add the terminate sequence response if required.
RMMsgContext terminateSequenceResponse = null;
if (SpecSpecificConstants.isTerminateSequenceResponseRequired(terminateSeqRMMsg.getRMSpecVersion()))
terminateSequenceResponse = getTerminateSequenceResponse(terminateSeqRMMsg, rmBean, sequenceId, storageManager);
setUpHighestMsgNumbers(context, storageManager, sequenceId, terminateSeqRMMsg);
boolean inOrderInvocation = SandeshaUtil.getDefaultPropertyBean(context.getAxisConfiguration()).isInOrder();
//if the invocation is inOrder and if this is RM 1.1 there is a posibility of all the messages having aleady being invoked.
//In this case we should do the full termination.
boolean doFullTermination = false;
if (inOrderInvocation && rmBean instanceof RMDBean) {
long highestMsgNo = ((RMDBean)rmBean).getHighestInMessageNumber();
long nextMsgToProcess = ((RMDBean)rmBean).getNextMsgNoToProcess();
if (nextMsgToProcess>highestMsgNo) {
//all the messages have been invoked, u can do the full termination
doFullTermination = true;
}
} else {
//for not-inorder case, always do the full termination.
doFullTermination = true;
}
if (doFullTermination) {
TerminateManager.cleanReceivingSideAfterInvocation(sequenceId, storageManager);
TerminateManager.cleanReceivingSideOnTerminateMessage(context, sequenceId, storageManager);
} else
TerminateManager.cleanReceivingSideOnTerminateMessage(context, sequenceId, storageManager);
rmBean.setTerminated(true);
rmBean.setLastActivatedTime(System.currentTimeMillis());
if(rmBean instanceof RMDBean){
storageManager.getRMDBeanMgr().update((RMDBean)rmBean);
}
else{
storageManager.getRMSBeanMgr().update((RMSBean)rmBean);
}
//sending the terminate sequence response
if (terminateSequenceResponse != null) {
//
// As we have processed the input and prepared the response we can commit the
// transaction now.
if(transaction != null && transaction.isActive()) transaction.commit();
MessageContext outMessage = terminateSequenceResponse.getMessageContext();
EndpointReference toEPR = outMessage.getTo();
outMessage.setServerSide(true);
try {
AxisEngine.send(outMessage);
} catch (AxisFault e) {
if (log.isDebugEnabled())
log.debug("Unable to send terminate sequence response", e);
throw new SandeshaException(
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendTerminateResponse), e);
}
if (toEPR.hasAnonymousAddress()) {
TransportUtils.setResponseWritten(terminateSeqMsg, true);
}
} else {
//if RM 1.0 Anonymous scenario we will be trying to attache the TerminateSequence of the response side
//as the response message.
String outgoingSideInternalSeqId = SandeshaUtil.getOutgoingSideInternalSequenceID(sequenceId);
SenderBean senderFindBean = new SenderBean ();
senderFindBean.setInternalSequenceID(outgoingSideInternalSeqId);
senderFindBean.setMessageType(Sandesha2Constants.MessageTypes.TERMINATE_SEQ);
senderFindBean.setSend(true);
senderFindBean.setReSend(false);
SenderBean outgoingSideTerminateBean = storageManager.getSenderBeanMgr().findUnique(senderFindBean);
if (outgoingSideTerminateBean!=null) {
EndpointReference toEPR = new EndpointReference (outgoingSideTerminateBean.getToAddress());
if (toEPR.hasAnonymousAddress()) {
String messageKey = outgoingSideTerminateBean
.getMessageContextRefKey();
MessageContext message = storageManager
.retrieveMessageContext(messageKey, context);
RMMsgContext rmMessage = MsgInitializer.initializeMessage(message);
// attaching the this outgoing terminate message as the
// response to the incoming terminate message.
message.setTransportOut(terminateSeqMsg.getTransportOut());
message.setProperty(MessageContext.TRANSPORT_OUT,terminateSeqMsg.getProperty(MessageContext.TRANSPORT_OUT));
message.setProperty(Constants.OUT_TRANSPORT_INFO, terminateSeqMsg.getProperty(Constants.OUT_TRANSPORT_INFO));
try {
AxisEngine.send(message);
TransportUtils.setResponseWritten(terminateSeqMsg, true);
} catch (AxisFault e) {
if (log.isDebugEnabled())
log.debug("Unable to send terminate sequence response", e);
throw new SandeshaException(
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendTerminateResponse), e);
}
// TODO - should this be here?
MessageRetransmissionAdjuster.adjustRetransmittion(rmMessage, outgoingSideTerminateBean, context, storageManager);
}
}
}
terminateSeqMsg.pause();
if (log.isDebugEnabled())
log.debug("Exit: TerminateSeqMsgProcessor::processInMessage " + Boolean.TRUE);
return true;
}