private boolean invokeMessage()

in modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java [207:383]


	private boolean invokeMessage(Transaction tran) {
		if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::invokeMessage");

		Transaction transaction = null;
		MessageContext msgToInvoke = null;
		boolean messageInvoked = true;
		
		// If we are not the holder of the correct lock, then we have to stop
		if(lock != null && (!lock.ownsLock(workId, this))) {
			if (log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run, another worker holds the lock");
			return false;
		}
		
		try {
			
			StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
			InvokerBeanMgr invokerBeanMgr = storageManager.getInvokerBeanMgr();
			
			//starting a transaction
			if(tran == null) {
				transaction = storageManager.getTransaction();
			} else {
				transaction = tran;
			}
			
			InvokerBean invokerBean = invokerBeanMgr.retrieve(messageContextKey);

			msgToInvoke = storageManager.retrieveMessageContext(messageContextKey, configurationContext);
			if(msgToInvoke==null){
				//return since there is nothing to do
				if(log.isDebugEnabled()) log.debug("null msg");
				return false;
			}

			// ending the transaction before invocation.
			if(transaction != null) {
				transaction.commit();
				transaction = storageManager.getTransaction();
			}

			RMMsgContext rmMsg = MsgInitializer.initializeMessage(msgToInvoke);

			// Lock the RMD Bean just to avoid deadlocks
			RMDBean rMDBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, invokerBean.getSequenceID());

			boolean highestMessage = false;

			if(!ignoreNextMsg){
				// updating the next msg to invoke
				long nextMsgNo = rMDBean.getNextMsgNoToProcess();
				
				if (!(invokerBean.getMsgNo()==nextMsgNo)) {
					//someone else has invoked this before us - this run should now stop
					if(log.isDebugEnabled()) log.debug("Operated message number is different from the Next Message Number to invoke");
					return false;
				}
				
				nextMsgNo++;
				rMDBean.setNextMsgNoToProcess(nextMsgNo);
				storageManager.getRMDBeanMgr().update(rMDBean);
			}
			
			// Check if this is the last message
			if (rmMsg.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
				Sequence sequence = rmMsg.getSequence();
				
				if (sequence.getLastMessage()) {
					//this will work for RM 1.0 only
					highestMessage = true;
				} else {
					if (rMDBean!=null && rMDBean.isTerminated()) {
						long highestInMsgNo = rMDBean.getHighestInMessageNumber();
						if (invokerBean.getMsgNo()==highestInMsgNo)
							highestMessage = true;
					}
				}
			}

			// Depending on the transaction  support, the service will be invoked only once. 
			// Therefore we delete the invoker bean and message now, ahead of time
			invokerBeanMgr.delete(messageContextKey);
			// removing the corresponding message context as well.
			storageManager.removeMessageContext(messageContextKey);

			try {

				boolean postFailureInvocation = false;

				// StorageManagers should st following property to
				// true, to indicate that the message received comes
				// after a failure.
				String postFaulureProperty = (String) msgToInvoke
						.getProperty(Sandesha2Constants.POST_FAILURE_MESSAGE);
				if (postFaulureProperty != null
						&& Sandesha2Constants.VALUE_TRUE.equals(postFaulureProperty))
					postFailureInvocation = true;

				InvocationResponse response = null;
				if (postFailureInvocation) {
					makeMessageReadyForReinjection(msgToInvoke);
					if (log.isDebugEnabled())
						log.debug("Receiving message, key=" + messageContextKey + ", msgCtx="
								+ msgToInvoke.getEnvelope().getHeader());
					response = AxisEngine.receive(msgToInvoke);
				} else {
					if (log.isDebugEnabled())
						log.debug("Resuming message, key=" + messageContextKey + ", msgCtx="
								+ msgToInvoke.getEnvelope().getHeader());
					msgToInvoke.setPaused(false);
					response = AxisEngine.resumeReceive(msgToInvoke);
				}

				if(!InvocationResponse.SUSPEND.equals(response)) {
					// Performance work - need to close the XMLStreamReader to prevent GC thrashing.
					SOAPEnvelope env = msgToInvoke.getEnvelope();
					if(env!=null){
						StAXBuilder sb = (StAXBuilder)msgToInvoke.getEnvelope().getBuilder();
						if(sb!=null){
							sb.close();
						}
					}
				}

				if (transaction != null && transaction.isActive()) {
					transaction.commit();
					transaction = storageManager.getTransaction();
				}

				if (highestMessage) {
					//do cleaning stuff that hs to be done after the invocation of the last message.
					TerminateManager.cleanReceivingSideAfterInvocation(invokerBean.getSequenceID(), storageManager);
					// exit from current iteration. (since an entry
					// was removed)
					if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::invokeMessage Last message return " + messageInvoked);					
					return messageInvoked;
				}

			} catch (SandeshaStorageTransientException e){
				if (log.isDebugEnabled())
					log.debug("SandeshaStorageTransientException :", e);
				
				if (transaction != null && transaction.isActive())
					transaction.rollback();
				messageInvoked = false;
				
			} catch (Exception e) {
				if (log.isDebugEnabled())
					log.debug("Exception :", e);

				if (transaction != null && transaction.isActive())
					transaction.rollback();
				messageInvoked = false;

				handleFault(rmMsg, e);
				
			}
			if(transaction != null && transaction.isActive()) transaction.commit();
			transaction = null;
			
		} catch (Exception e) {
			if (log.isErrorEnabled())
				log.error(e.toString(), e);
			messageInvoked = false;
		} finally {
			if (transaction!=null && transaction.isActive()) {
				try {
					transaction.rollback();
				} catch (SandeshaStorageException e) {
					if (log.isWarnEnabled())
						log.warn("Caught exception rolling back transaction", e);
				}
			}
		}
		
		if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::invokeMessage " + messageInvoked);
		return messageInvoked;
	}