protected boolean internalRun()

in modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java [209:374]


	protected boolean internalRun() {
		if (log.isDebugEnabled()) log.debug("Enter: Invoker::internalRun");
		
		boolean sleep = false;
		Transaction transaction = null;

		try {
			RMDBeanMgr nextMsgMgr = storageManager.getRMDBeanMgr();

			InvokerBeanMgr storageMapMgr = storageManager
					.getInvokerBeanMgr();

			transaction = storageManager.getTransaction();
			
			// Pick a sequence using a round-robin approach
			ArrayList<SequenceEntry> allSequencesList = getSequences();
			int size = allSequencesList.size();
			log.debug("Choosing one from " + size + " sequences");
			if(nextIndex >= size) {
				nextIndex = 0;

				// We just looped over the set of sequences. If we didn't process any
				// messages on this loop then we sleep before the next one
				if(size == 0 || !processedMessage) {
					sleep = true;
				}
				processedMessage = false;
				
				if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, looped over all sequences, sleep " + sleep);
				
				if(transaction != null && transaction.isActive()) transaction.commit();
				transaction = null;
				
				return sleep;
			}

			SequenceEntry entry = (SequenceEntry) allSequencesList.get(nextIndex++);
			String sequenceId = entry.getSequenceId();
			log.debug("Chose sequence " + sequenceId);

			RMDBean nextMsgBean = nextMsgMgr.retrieve(sequenceId);
			if (nextMsgBean == null) {
				log.debug("Next message not set correctly. Removing invalid entry.");

				stopThreadForSequence(sequenceId, entry.isRmSource());
				allSequencesList = getSequences();
				if (allSequencesList.size() == 0)
					sleep = true;

				if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, sleep " + sleep);
				
				if(transaction != null && transaction.isActive()) transaction.commit();
				transaction = null;

				return sleep;
			}

			long nextMsgno = nextMsgBean.getNextMsgNoToProcess();
			if (nextMsgno <= 0) {
				// Make sure we sleep on the next loop, so that we don't spin in a tight loop
				sleep = true;
				if (log.isDebugEnabled())
					log.debug("Invalid Next Message Number " + nextMsgno);
				String message = SandeshaMessageHelper.getMessage(
						SandeshaMessageKeys.invalidMsgNumber, Long
								.toString(nextMsgno));
				throw new SandeshaException(message);
			}

			InvokerBean selector = new InvokerBean();
			selector.setSequenceID(sequenceId);
			selector.setMsgNo(nextMsgno);
			List<InvokerBean> invokerBeans = storageMapMgr.find(selector);
			
			//add any msgs that belong to out of order windows
			addOutOfOrderInvokerBeansToList(sequenceId, 
					storageManager, invokerBeans);
			
			// If there aren't any beans to process then move on to the next sequence
			if (invokerBeans.size() == 0) {
				if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, no beans to invoke on sequence " + sequenceId + ", sleep " + sleep);
				
				if(transaction != null && transaction.isActive()) transaction.commit();
				transaction = null;

				return sleep;
			}
			
			Iterator<InvokerBean> stMapIt = invokerBeans.iterator();

			//TODO correct the locking mechanism to have one lock per sequence.
			//TODO should this be a while, not an if?
			if (stMapIt.hasNext()) { //some invokation work is present
				
				InvokerBean bean = (InvokerBean) stMapIt.next();
				//see if this is an out of order msg
				boolean beanIsOutOfOrderMsg = bean.getMsgNo()!=nextMsgno;
				
				String workId = sequenceId; 
									
				//check whether the bean is already assigned to a worker.
				if (getWorkerLock().isWorkPresent(workId)) {
					// As there is already a worker assigned we are probably dispatching
					// messages too quickly, so we sleep before trying the next sequence.
					sleep = true;
					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned, workId);
					if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, " + message + ", sleep " + sleep);
					
					if(transaction != null) {
						transaction.commit();
						transaction = null;
					}
					
					return sleep;
				}

				if(transaction != null) {
					transaction.commit();
					transaction = null;
				}

				// start a new worker thread and let it do the invocation.
				InvokerWorker worker = new InvokerWorker(context, bean);
				if(beanIsOutOfOrderMsg) worker.forceOutOfOrder();
				worker.setPooled();
				worker.setWorkId(workId);
				
				// Wrap the invoker worker with the correct context, if needed.
				Runnable work = worker;
				ContextManager contextMgr = SandeshaUtil.getContextManager(context);
				if(contextMgr != null) {
					work = contextMgr.wrapWithContext(work, bean.getContext());
				}
				try {
					// Set the lock up before we start the thread, but roll it back
					// if we hit any problems
					if(worker.getLock().addWork(workId, worker))
						threadPool.execute(work);
				} catch(Exception e) {
					worker.getLock().removeWork(workId);
				}				
				processedMessage = true;
			}
			
			if(transaction != null && transaction.isActive()) transaction.commit();
			transaction = null;
		} catch (Exception e) {
			String message = SandeshaMessageHelper
					.getMessage(SandeshaMessageKeys.invokeMsgError);
			if(log.isDebugEnabled()) log.debug(message, e);
		} finally {
			if (transaction != null && transaction.isActive()) {
				try {
					transaction.rollback();
				} catch (Exception e) {
					String message = SandeshaMessageHelper.getMessage(
							SandeshaMessageKeys.rollbackError, e.toString());
					if(log.isDebugEnabled()) log.debug(message, e);
				}
			}
		}

		if (log.isDebugEnabled())
			log.debug("Exit: InOrderInvoker::internalRun");
		return sleep;
	}