public static synchronized void forceInvokeOfAllMessagesCurrentlyOnSequence()

in modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java [73:175]


	public static synchronized void forceInvokeOfAllMessagesCurrentlyOnSequence(ConfigurationContext ctx, 
			String sequenceID,
			boolean allowLaterDeliveryOfMissingMessages)throws SandeshaException{
//		//first we block while we wait for the invoking thread to pause
//		blockForPause();
		try{
			StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(ctx, ctx.getAxisConfiguration());
			//get all invoker beans for the sequence
			InvokerBeanMgr storageMapMgr = storageManager
					.getInvokerBeanMgr();
			RMDBeanMgr rmdBeanMgr = storageManager.getRMDBeanMgr();
			RMDBean rMDBean = rmdBeanMgr.retrieve(sequenceID);
			
			if (rMDBean != null) {
				
				//The outOfOrder window is the set of known sequence messages (including those
				//that are missing) at the time the button is pressed.
				long firstMessageInOutOfOrderWindow = rMDBean.getNextMsgNoToProcess();
			
				InvokerBean selector = new InvokerBean();
				selector.setSequenceID(sequenceID);
				Iterator<InvokerBean> stMapIt = storageMapMgr.find(selector).iterator();
				
				long highestMsgNumberInvoked = 0;
				Transaction transaction = null;
				
				//invoke each bean in turn. 
				//NOTE: here we are breaking ordering
				while(stMapIt.hasNext()){
					//invoke the app
					try{
						transaction = storageManager.getTransaction();
						InvokerBean invoker = (InvokerBean)stMapIt.next();
						
						// start a new worker thread and let it do the invocation.
						String workId = sequenceID;
						
						InvokerWorker worker = new InvokerWorker(ctx, invoker);
						worker.forceOutOfOrder();
						worker.setPooled();
						worker.setWorkId(workId);
						
						// Wrap the invoker worker with the correct context, if needed.
						Runnable work = worker;
						ContextManager contextMgr = SandeshaUtil.getContextManager(ctx);
						if(contextMgr != null) {
							work = contextMgr.wrapWithContext(work, invoker.getContext());
						}
						
						// Setup the lock for the new worker
						worker.getLock().addWork(workId, worker);
						ctx.getThreadPool().execute(work);

						long msgNumber = invoker.getMsgNo();
						//if necessary, update the "next message number" bean under this transaction
						if(msgNumber>highestMsgNumberInvoked){
							highestMsgNumberInvoked = invoker.getMsgNo();
							rMDBean.setNextMsgNoToProcess(highestMsgNumberInvoked+1);
							
							if(allowLaterDeliveryOfMissingMessages){
								//we also need to update the sequence OUT_OF_ORDER_RANGES property
								//so as to include our latest view of this outOfOrder range.
								//We do that here (rather than once at the end) so that we reamin
								//transactionally consistent
								Range r = new Range(firstMessageInOutOfOrderWindow,highestMsgNumberInvoked);
										
								RangeString rangeString = null;
								if(rMDBean.getOutOfOrderRanges()==null){
									//insert a new blank one one
									rangeString = new RangeString();
								}
								else{
									rangeString = rMDBean.getOutOfOrderRanges();
								}
								//update the range String with the new value
								rangeString.addRange(r);
								rMDBean.setOutOfOrderRanges(rangeString);
							}
							
							rmdBeanMgr.update(rMDBean);
						}
						
						if(transaction != null && transaction.isActive()) transaction.commit();
						transaction = null;
					}
					catch(Exception e){
						// Just log the error
						if(log.isDebugEnabled()) log.debug("Exception", e);
					} finally {
						if(transaction != null && transaction.isActive()) {
							transaction.rollback();
							transaction = null;
						}
					}
		
				}//end while
			}
		}
		finally{
//			//restart the invoker
//			finishPause();
		}
	}