public boolean processOutMessage()

in modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java [148:589]


	public boolean processOutMessage(RMMsgContext rmMsgCtx, Transaction tran) throws AxisFault {
		if (log.isDebugEnabled())
			log.debug("Enter: ApplicationMsgProcessor::processOutMessage");

		appMsgProcTran= tran;
		MessageContext msgContext = rmMsgCtx.getMessageContext();
		ConfigurationContext configContext = msgContext.getConfigurationContext();
		
		//Please note: no need to check that RM1.0 annon out-in has a sequence offer, since we actually force an offer in this case
		
		// setting the Fault callback
		SandeshaListener faultCallback = (SandeshaListener) msgContext.getOptions().getProperty(
				SandeshaClientConstants.SANDESHA_LISTENER);
		if (faultCallback != null) {
			OperationContext operationContext = msgContext.getOperationContext();
			if (operationContext != null) {
				operationContext.setProperty(SandeshaClientConstants.SANDESHA_LISTENER, faultCallback);
			}
		}

		StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configContext, configContext
				.getAxisConfiguration());

		//check the TO address is ok
		SandeshaUtil.getEPRDecorator(configContext).checkEndpointReference(msgContext.getTo());
		
		boolean serverSide = msgContext.isServerSide();

		// setting message Id if null
		if (msgContext.getMessageID() == null)
			msgContext.setMessageID(SandeshaUtil.getUUID());

		/*
		 * Internal sequence id is the one used to refer to the sequence (since
		 * actual sequence id is not available when first msg arrives) server
		 * side - a derivation of the sequenceId of the incoming sequence client
		 * side - a derivation of wsaTo & SeequenceKey
		 */
		String internalSequenceId = getSequenceID(rmMsgCtx, serverSide, false); //get a sequenceID, possibly pre-existing
		if (log.isDebugEnabled())
			log.debug("Enter: ApplicationMsgProcessor::internalSequenceId = " + internalSequenceId);
		
		boolean lastMessage = false;
		if(!serverSide){
			String lastAppMessage = (String) msgContext.getProperty(SandeshaClientConstants.LAST_MESSAGE);
			if (lastAppMessage != null && "true".equals(lastAppMessage)){
				lastMessage = true;
				if (log.isDebugEnabled())
					log.debug("Enter: ApplicationMsgProcessor: last message");
			}
		}
		
		if (internalSequenceId!=null)
			rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,internalSequenceId);

		/*
		 * checking weather the user has given the messageNumber (most of the
		 * cases this will not be the case where the system will generate the
		 * message numbers
		 */

		// User should set it as a long object.
		Long messageNumberLng = (Long) msgContext.getProperty(SandeshaClientConstants.MESSAGE_NUMBER);

		long givenMessageNumber = -1;
		if (messageNumberLng != null) {
			givenMessageNumber = messageNumberLng.longValue();
			if (givenMessageNumber <= 0) {
				throw new SandeshaException(SandeshaMessageHelper.getMessage(
						SandeshaMessageKeys.msgNumberMustBeLargerThanZero, Long.toString(givenMessageNumber)));
			}
		}

		// A dummy message is a one which will not be processed as a actual
		// application message.
		// The RM handlers will simply let these go.
		String dummyMessageString = (String) msgContext.getOptions().getProperty(SandeshaClientConstants.DUMMY_MESSAGE);
		boolean dummyMessage = false;
		if (dummyMessageString != null && Sandesha2Constants.VALUE_TRUE.equals(dummyMessageString))
			dummyMessage = true;

		RMSBean rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);

		boolean autoStartNewSeqForReallocation = false;
		//if this is an existing sequence then we need to do some checks first
		if(rmsBean != null)
		{
			//If the sequence has been reallocated we need to find out the new internalSeqID.
			//If the internalSeqID hasn't been set yet we should auto restart.  If it has a new
			//internalSeqID we just send the message on the new reallocated sequence. 
			int seqReallocated = rmsBean.isReallocated();
			if(seqReallocated == Sandesha2Constants.WSRM_COMMON.REALLOCATED){
				if (log.isDebugEnabled())
					log.debug("ApplicationMsgProcessor: Reallocated Sequence: " + rmsBean.getSequenceID());
				//Try and get the new internalSeqID
				internalSequenceId = rmsBean.getInternalSeqIDOfSeqUsedForReallocation();
				if(internalSequenceId != null){
					if (log.isDebugEnabled())
						log.debug("ApplicationMsgProcessor: InternalSeqID of new sequence: " + internalSequenceId);
					rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID, internalSequenceId);
					rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
				} else {
					autoStartNewSeqForReallocation = true;
				}
			} else if(seqReallocated == Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED){
				//We can't do anymore as we have already tried to reallocate this sequence.
				throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.reallocationFailed, rmsBean.getSequenceID(),
						"We have already attempted to reallocate this Sequence and we won't try again.  The sequance needs to be cleaned up manually."));
			}

			//see if the sequence is closed
			if(rmsBean.isSequenceClosedClient() || rmsBean.isTerminateAdded() || rmsBean.isTimedOut() || autoStartNewSeqForReallocation){
				if(SandeshaUtil.isAutoStartNewSequence(msgContext)){
					internalSequenceId = getSequenceID(rmMsgCtx, serverSide, true); //require a new sequence
					if(autoStartNewSeqForReallocation){
						if (log.isDebugEnabled())
							log.debug("ApplicationMsgProcessor: autoStartNewSeqForReallocation: InternalSeqID of new sequence used for reallocation: " 
										+ internalSequenceId);
						rmsBean.setInternalSeqIDOfSeqUsedForReallocation(internalSequenceId);
						storageManager.getRMSBeanMgr().update(rmsBean);	
					}
					if (log.isDebugEnabled())
						log.debug("ApplicationMsgProcessor: auto start new sequence " + internalSequenceId + " :: " + rmsBean);
					//set this new internal sequence ID on the msg
					rmMsgCtx.setProperty(Sandesha2Constants.MessageContextProperties.INTERNAL_SEQUENCE_ID,internalSequenceId);
					rmsBean = null;
				}
				else if(rmsBean.isSequenceClosedClient()){
					throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendMsgAsSequenceClosed, internalSequenceId));
				}
				else if(rmsBean.isTerminateAdded()){
					throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendMsgAsSequenceTerminated, internalSequenceId));
				}
				else if(rmsBean.isTimedOut()){
					throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendMsgAsSequenceTimedout, internalSequenceId));
				}
			}
			else if(!msgContext.isServerSide())
			{
				//we need to check that any outgoing request msg is not using a different MEP than the sequence was established with
				//The easiest way to check this is to ensure that the request and the replyTo of the RMS bean do not
				//differ between non-anonymous and anonymous (either MC anonymous or WS-A anonymous)
				boolean msgIsAnon = !AddressingHelper.isReplyRedirected(msgContext);
				boolean isSequenceAnon = rmsBean.getReplyToEndpointReference() ==null || 
											rmsBean.getReplyToEndpointReference().getAddress() ==null || 
											rmsBean.getReplyToEndpointReference().hasAnonymousAddress();
				if(msgIsAnon != isSequenceAnon && SandeshaUtil.isForbidMixedEPRsOnSequence(msgContext))
				{
					String msg = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.cannotSendMsgAsEPRWrong, msgContext.toString(), rmsBean.getSequenceID());
					log.warn(msg);
					throw new SandeshaException(msg);
				}				
			}
		}
		if (log.isDebugEnabled())
			log.debug("Enter: ApplicationMsgProcessor:: initial sequence checks pass");
		
		// If the call application is a 2-way MEP, and uses a anonymous replyTo, and the
		// RM 1.1 spec level, then we must have MakeConnection enabled. We check that here,
		// before we start creating a new Sequence.
		if(!serverSide) {
			AxisOperation op = msgContext.getAxisOperation();
			int mep = WSDLConstants.MEP_CONSTANT_INVALID;
			if(op != null) {
				mep = op.getAxisSpecificMEPConstant();
			}
			if(mep == WSDLConstants.MEP_CONSTANT_OUT_IN) {
				String specVersion = null;
				if(rmsBean == null) {
					specVersion = SequenceManager.getSpecVersion(msgContext, storageManager);
				} else {
					specVersion = rmsBean.getRMVersion();
				}
			}
		}

		//setting the reference msg store key.
		if (rmsBean!=null && rmsBean.getReferenceMessageStoreKey()==null) {
			//setting this application message as the reference, if it hsnt already been set.
			
			String referenceMsgKey = SandeshaUtil.getUUID();
			storageManager.storeMessageContext(referenceMsgKey, msgContext);
			rmsBean.setReferenceMessageStoreKey(referenceMsgKey);
		}
		
		String outSequenceID = null;


		// Work out if there is a user transaction involved before updating any store state
		// to give any storage manager interface a chance to setup any transactional state
		boolean hasUserTransaction = storageManager.hasUserTransaction(msgContext);
		
		try {
			
			if (rmsBean == null) { 
				// SENDING THE CREATE SEQUENCE.
				if (log.isDebugEnabled())
					log.debug("Enter: ApplicationMsgProcessor:: sending createSequence");
				while (rmsBean == null) {
					// There is a timing window where 2 sending threads can hit this point
					// at the same time and both will create an RMSBean to the same endpoint
					// with the same internal sequenceid
					// Check that someone hasn't created the bean
					rmsBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, internalSequenceId);
	
					// if first message - setup the sending side sequence - both for the
					// server and the client sides.
					if (rmsBean == null) {
						rmsBean = SequenceManager.setupNewClientSequence(msgContext, internalSequenceId, storageManager);
						rmsBean = addCreateSequenceMessage(rmMsgCtx, rmsBean, storageManager);

						if(autoStartNewSeqForReallocation){
							rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.RMS_BEAN_USED_FOR_REALLOCATION);
						}

						if(rmsBean != null) outSequenceID = rmsBean.getSequenceID();
						
						if (rmsBean == null && appMsgProcTran != null && appMsgProcTran.isActive()) {
							// Rollback the current locks.
							appMsgProcTran.rollback();
	
							// Create a new tran.  This avoids a potential deadlock where the RMS/RMDBeans
							// are taken in reverse order.
							appMsgProcTran = storageManager.getTransaction();
						}
					}
				}
	
			} else {
				outSequenceID = rmsBean.getSequenceID();
			}
			
			// the message number that was last used.
			long systemMessageNumber = rmsBean.getNextMessageNumber();
	
			// The number given by the user has to be larger than the last stored
			// number.
			if (givenMessageNumber > 0 && givenMessageNumber <= systemMessageNumber) {
				String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.msgNumberNotLargerThanLastMsg, Long
						.toString(givenMessageNumber));
				throw new SandeshaException(message);
			}
	
			// Finding the correct message number.
			long messageNumber = -1;
			if (givenMessageNumber > 0) // if given message number is valid use it.
										// (this is larger than the last stored due
										// to the last check)
				messageNumber = givenMessageNumber;
			else if (systemMessageNumber > 0) { // if system message number is valid
												// use it.
				messageNumber = systemMessageNumber + 1;
			} else { // This is the first message (systemMessageNumber = -1)
				messageNumber = 1;
			}
			
			if (log.isDebugEnabled())
				log.debug("Enter: ApplicationMsgProcessor::message number=" + messageNumber);
	
			if (serverSide) {
				// Deciding whether this is the last message. We assume it is if it relates to
				// a message which arrived with the LastMessage flag on it. 
				RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, inboundSequence);			
				// Get the last in message
				String lastRequestId = rmdBean.getLastInMessageId();
				RelatesTo relatesTo = msgContext.getRelatesTo();
				if(relatesTo != null && lastRequestId != null &&
						lastRequestId.equals(relatesTo.getValue())) {
					lastMessage = true;
				}
				
				//or a constant property may call it as the last msg
				Boolean inboundLast = (Boolean) msgContext.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_LAST_MESSAGE); 
				if (inboundLast!=null && inboundLast.booleanValue())
					lastMessage = true;
			}
			
			if (lastMessage) {
				rmsBean.setLastOutMessage(messageNumber);
				
				if (msgContext.getTo() == null || (msgContext.getTo() != null && msgContext.getTo().isWSAddressingAnonymous())) {
				  if (log.isDebugEnabled())
				    log.debug("Rewriting anonymous EPR for LastMessage to the one stored in the RMSBean");
				  msgContext.setTo(rmsBean.getToEndpointReference());
				}
			}
	
			// set this as the response highest message.
			rmsBean.setHighestOutMessageNumber(messageNumber);
			
			// saving the used message number
			//Save the expected replies if it's not a dummy msg and it's an outIn MEP
			String specVersion = SequenceManager.getSpecVersion(rmMsgCtx.getMessageContext(), storageManager);
			if (!dummyMessage) {
				rmsBean.setNextMessageNumber(messageNumber);
				
				// Identify the MEP associated with the message.
				AxisOperation op = msgContext.getAxisOperation();
				int mep = WSDLConstants.MEP_CONSTANT_INVALID;
				if(op != null) {
					mep = op.getAxisSpecificMEPConstant();
				}
				
				EndpointReference replyTo = msgContext.getReplyTo();
				
				if(mep == WSDLConstants.MEP_CONSTANT_OUT_IN){
					// We only match up requests and replies when we are doing sync interactions
					if (log.isDebugEnabled()) log.debug("MEP OUT_IN");	
					if(replyTo == null || replyTo.hasAnonymousAddress()) {
						long expectedReplies = rmsBean.getExpectedReplies();
						rmsBean.setExpectedReplies(expectedReplies + 1);
					}
					
					// If we support the RM anonymous URI then rewrite the ws-a anon to use the RM equivalent.
					//(do should be done only for WSRM 1.1)
					if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) {
						if (log.isDebugEnabled()) log.debug("SPEC_1_1");
						String oldAddress = (replyTo == null) ? null : replyTo.getAddress();
						EndpointReference newReplyTo = SandeshaUtil.rewriteEPR(rmsBean, msgContext
								.getReplyTo(), configContext);
						String newAddress = (newReplyTo == null) ? null : newReplyTo.getAddress();
						if(newAddress != null && !newAddress.equals(oldAddress)){
							msgContext.setReplyTo(newReplyTo);
						}
					}
				}
				// Set the faultTo to anonymous to make sure we get Sandesha faults back.
				if(mep == WSDLConstants.MEP_CONSTANT_OUT_ONLY
					|| (Sandesha2Constants.SPEC_VERSIONS.v1_0.equals(specVersion) && replyTo == null)) {
					if (log.isDebugEnabled()) 
						log.debug("Setting the faultTo to anonymous as a oneWay MEP is being used and fault msgs can then be delivered back on the backchannel");
					if(msgContext.getFaultTo() == null)
						msgContext.setFaultTo(new EndpointReference(AddressingConstants.Final.WSA_ANONYMOUS_URL));
				}
			} 
			
			boolean startPolling = false;
			// We should poll for any reply-to that uses the anonymous URI, when MakeConnection
			// is enabled.
			if (Sandesha2Constants.SPEC_VERSIONS.v1_1.equals(specVersion)) {
				SandeshaPolicyBean policy = SandeshaUtil.getPropertyBean(msgContext.getConfigurationContext().getAxisConfiguration());
				if(policy.isEnableMakeConnection()) {
					EndpointReference reference = rmsBean.getAcksToEndpointReference();
					if(reference == null || reference.hasAnonymousAddress()) {
						rmsBean.setPollingMode(true);
						startPolling = true;
						//ensure addressing is turned on since we require it if we want to use makeConnection
						if (log.isDebugEnabled()) log.debug("Ensuring that WS-A is enabled for msg " + msgContext);
						msgContext.setProperty(AddressingConstants.DISABLE_ADDRESSING_FOR_OUT_MESSAGES,Boolean.FALSE);
					}
				}
			}

			if (log.isDebugEnabled()) log.debug("App msg using replyTo EPR as " + msgContext.getReplyTo() + " and faultTo EPR as " + msgContext.getFaultTo());
			
			RelatesTo relatesTo = msgContext.getRelatesTo();
			if(relatesTo != null) {
				rmsBean.setHighestOutRelatesTo(relatesTo.getValue());
			}
	
			// setting async ack endpoint for the server side. (if present)
			if (serverSide) {
				if (rmsBean.getToEndpointReference() != null) {
					msgContext.setProperty(SandeshaClientConstants.AcksTo, rmsBean.getToEndpointReference().getAddress());
				}
			}
	
			// Update the rmsBean
			storageManager.getRMSBeanMgr().update(rmsBean);
			
			if(startPolling) {
				SandeshaUtil.startWorkersForSequence(msgContext.getConfigurationContext(), rmsBean);
			}
			
			
			int SOAPVersion = Sandesha2Constants.SOAPVersion.v1_1;
			if (!msgContext.isSOAP11())
				SOAPVersion = Sandesha2Constants.SOAPVersion.v1_2;
			if (msgContext.getEnvelope() == null) {
				try {
					msgContext.setEnvelope(SOAPAbstractFactory.getSOAPFactory(
							SOAPVersion).getDefaultEnvelope());
				} catch (AxisFault e) {
					throw new SandeshaException(e.getMessage());
				}
			}
	
			SOAPBody soapBody = rmMsgCtx.getSOAPEnvelope().getBody();
			if (soapBody == null) {
				String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.soapBodyNotPresent);
				log.debug(message);
				throw new SandeshaException(message);
			}
	
			
			if (rmMsgCtx.getMessageId() == null) {
				String messageId1 = SandeshaUtil.getUUID();
				rmMsgCtx.setMessageId(messageId1);
			}
	
			EndpointReference toEPR = msgContext.getTo();
	
			
			if (toEPR != null) {
				// setting default actions.
				if (log.isDebugEnabled())
					log.debug("Enter: ApplicationMsgProcessor::setting default actions");
				String to = toEPR.getAddress();
				String operationName = msgContext.getOperationContext().getAxisOperation().getName().getLocalPart();
				if (msgContext.getWSAAction() == null) {
					msgContext.setWSAAction(to + "/" + operationName);
				}
				if (msgContext.getSoapAction() == null) {
					msgContext.setSoapAction("\"" + to + "/" + operationName + "\"");
				}
			}
			
			// processing the response if not an dummy.
			if (!dummyMessage){
				String storageKey = SandeshaUtil.getUUID(); 
				processResponseMessage(rmMsgCtx, rmsBean, internalSequenceId, outSequenceID, messageNumber, storageKey, storageManager, tran, hasUserTransaction);
			}
			//Users wont be able to get reliable response msgs in the back channel in the back channel of a 
			//reliable message. If he doesn't have a endpoint he should use polling mechanisms.
			msgContext.pause();
			
			if (appMsgProcTran != null && appMsgProcTran.isActive()) {
				appMsgProcTran.commit();
				appMsgProcTran = null;
			}
		}

		finally {
			if (appMsgProcTran != null && appMsgProcTran.isActive())
				appMsgProcTran.rollback();
		}
		
		if (log.isDebugEnabled())
			log.debug("Exit: ApplicationMsgProcessor::processOutMessage " + Boolean.TRUE);

		return true;
	}