public boolean processInMessage()

in modules/core/src/main/java/org/apache/sandesha2/msgprocessors/TerminateSeqMsgProcessor.java [68:232]


	public boolean processInMessage(RMMsgContext terminateSeqRMMsg, Transaction transaction) throws AxisFault {

		if (log.isDebugEnabled())
			log.debug("Enter: TerminateSeqMsgProcessor::processInMessage");

		MessageContext terminateSeqMsg = terminateSeqRMMsg.getMessageContext();

		// Processing the terminate message
		// TODO Add terminate sequence message logic.
		TerminateSequence terminateSequence = terminateSeqRMMsg.getTerminateSequence();
		if (terminateSequence == null) {
			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noTerminateSeqPart);
			log.debug(message);
			throw new SandeshaException(message);
		}

		String sequenceId = terminateSequence.getIdentifier().getIdentifier();
		if (sequenceId == null || "".equals(sequenceId)) {
			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidSequenceID, null);
			log.debug(message);
			throw new SandeshaException(message);
		}
		
		ConfigurationContext context = terminateSeqMsg.getConfigurationContext();
		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context,context.getAxisConfiguration());
		
		// Check that the sender of this TerminateSequence holds the correct token
		RMSequenceBean rmBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
		if(rmBean==null){
			rmBean = SandeshaUtil.getRMSBeanFromSequenceId(storageManager, sequenceId);
		}
		
		//check security credentials
		SandeshaUtil.assertProofOfPossession(rmBean, terminateSeqMsg, 
				terminateSeqMsg.getEnvelope().getBody());

		if (FaultManager.checkForUnknownSequence(terminateSeqRMMsg, sequenceId, storageManager, false)) {
			if (log.isDebugEnabled())
				log.debug("Exit: TerminateSeqMsgProcessor::processInMessage, unknown sequence");
			return false;
		}

		// add the terminate sequence response if required.
		RMMsgContext terminateSequenceResponse = null;
		if (SpecSpecificConstants.isTerminateSequenceResponseRequired(terminateSeqRMMsg.getRMSpecVersion()))
			terminateSequenceResponse = getTerminateSequenceResponse(terminateSeqRMMsg, rmBean, sequenceId, storageManager);

		setUpHighestMsgNumbers(context, storageManager, sequenceId, terminateSeqRMMsg);
		
		boolean inOrderInvocation = SandeshaUtil.getDefaultPropertyBean(context.getAxisConfiguration()).isInOrder();
		
		//if the invocation is inOrder and if this is RM 1.1 there is a posibility of all the messages having aleady being invoked.
		//In this case we should do the full termination.
		
		boolean doFullTermination = false;
		
		if (inOrderInvocation && rmBean instanceof RMDBean) {

			long highestMsgNo = ((RMDBean)rmBean).getHighestInMessageNumber();
			long nextMsgToProcess = ((RMDBean)rmBean).getNextMsgNoToProcess();
			
			if (nextMsgToProcess>highestMsgNo) {
				//all the messages have been invoked, u can do the full termination
				doFullTermination = true;
			}
		} else {
			//for not-inorder case, always do the full termination.
			doFullTermination = true;
		}
		
		if (doFullTermination) {
			TerminateManager.cleanReceivingSideAfterInvocation(sequenceId, storageManager);
			TerminateManager.cleanReceivingSideOnTerminateMessage(context, sequenceId, storageManager);
		} else
			TerminateManager.cleanReceivingSideOnTerminateMessage(context, sequenceId, storageManager);

		rmBean.setTerminated(true);		
		rmBean.setLastActivatedTime(System.currentTimeMillis());
		if(rmBean instanceof RMDBean){
			storageManager.getRMDBeanMgr().update((RMDBean)rmBean);
		}
		else{
			storageManager.getRMSBeanMgr().update((RMSBean)rmBean);
		}

		//sending the terminate sequence response
		if (terminateSequenceResponse != null) {
			//
			// As we have processed the input and prepared the response we can commit the
			// transaction now.
			if(transaction != null && transaction.isActive()) transaction.commit();
			
			MessageContext outMessage = terminateSequenceResponse.getMessageContext();
			EndpointReference toEPR = outMessage.getTo();
			
			outMessage.setServerSide(true);
						
			try {							
				AxisEngine.send(outMessage);
			} catch (AxisFault e) {
				if (log.isDebugEnabled())
					log.debug("Unable to send terminate sequence response", e);
				
				throw new SandeshaException(
						SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendTerminateResponse), e);
			}
			
			if (toEPR.hasAnonymousAddress()) {
				TransportUtils.setResponseWritten(terminateSeqMsg, true);
			}

		} else {
			//if RM 1.0 Anonymous scenario we will be trying to attache the TerminateSequence of the response side 
			//as the response message.
			
			String outgoingSideInternalSeqId = SandeshaUtil.getOutgoingSideInternalSequenceID(sequenceId);
			SenderBean senderFindBean = new SenderBean ();
			senderFindBean.setInternalSequenceID(outgoingSideInternalSeqId);
			senderFindBean.setMessageType(Sandesha2Constants.MessageTypes.TERMINATE_SEQ);
			senderFindBean.setSend(true);
			senderFindBean.setReSend(false);
			
			SenderBean outgoingSideTerminateBean = storageManager.getSenderBeanMgr().findUnique(senderFindBean);
			if (outgoingSideTerminateBean!=null) {
			
				EndpointReference toEPR = new EndpointReference (outgoingSideTerminateBean.getToAddress());
				if (toEPR.hasAnonymousAddress()) {
					String messageKey = outgoingSideTerminateBean
							.getMessageContextRefKey();
					MessageContext message = storageManager
							.retrieveMessageContext(messageKey, context);

					RMMsgContext rmMessage = MsgInitializer.initializeMessage(message);
					
					// attaching the this outgoing terminate message as the
					// response to the incoming terminate message.
					message.setTransportOut(terminateSeqMsg.getTransportOut());
					message.setProperty(MessageContext.TRANSPORT_OUT,terminateSeqMsg.getProperty(MessageContext.TRANSPORT_OUT));
					message.setProperty(Constants.OUT_TRANSPORT_INFO, terminateSeqMsg.getProperty(Constants.OUT_TRANSPORT_INFO));
	                
					try {							
						AxisEngine.send(message);
						TransportUtils.setResponseWritten(terminateSeqMsg, true);
					} catch (AxisFault e) {
						if (log.isDebugEnabled())
							log.debug("Unable to send terminate sequence response", e);
						
						throw new SandeshaException(
								SandeshaMessageHelper.getMessage(SandeshaMessageKeys.couldNotSendTerminateResponse), e);
					}
					
					// TODO - should this be here?
					MessageRetransmissionAdjuster.adjustRetransmittion(rmMessage, outgoingSideTerminateBean, context, storageManager);
				}
				
			}

		}
		
		terminateSeqMsg.pause();

		if (log.isDebugEnabled())
			log.debug("Exit: TerminateSeqMsgProcessor::processInMessage " + Boolean.TRUE);
		return true;
	}