private static boolean cleanSendingSideData()

in modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java [259:392]


	private static boolean cleanSendingSideData(String internalSequenceId, StorageManager storageManager, 
			RMSBean rmsBean, boolean reallocateIfPossible, Transaction transaction) throws SandeshaException {

		if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
			log.debug("Enter: TerminateManager::cleanSendingSideData " + internalSequenceId + ", " + reallocateIfPossible);
		
		boolean reallocatedOK = false;
		SenderBeanMgr retransmitterBeanMgr = storageManager.getSenderBeanMgr();

		// removing retransmitterMgr entries and corresponding message contexts.
		Collection<SenderBean> collection = retransmitterBeanMgr.find(internalSequenceId);
		Iterator<SenderBean> iterator = collection.iterator();
		List<MessageContext> msgsToReallocate = null;
		if(reallocateIfPossible){
			msgsToReallocate = new LinkedList<MessageContext>();
		}
		Range[] ranges = rmsBean.getClientCompletedMessages().getRanges();
		long lastAckedMsg = -1;
		
		if(ranges.length==1){
			//the sequence is a single contiguous acked range
			lastAckedMsg = ranges[0].upperValue;
		} else{
			if(reallocateIfPossible){
				//cannot reallocate as there are gaps
				rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED);
				storageManager.getRMSBeanMgr().update(rmsBean);
				reallocateIfPossible=false;
				if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
					log.debug("cannot reallocate sequence as there are gaps");
			}
		}
		
		while (iterator.hasNext()) {
			SenderBean retransmitterBean = (SenderBean) iterator.next();
			if(retransmitterBean.getMessageType()!=Sandesha2Constants.MessageTypes.TERMINATE_SEQ || rmsBean.isTerminated()){
				//remove all but terminate sequence messages
				String messageStoreKey = retransmitterBean.getMessageContextRefKey();
				//if we have been asked to reallocate we need to send all unacked messages to a new sequence.
				//We must ensure that we rerieve these messages in the correct order 
				if(reallocateIfPossible
					&& retransmitterBean.getMessageType()==Sandesha2Constants.MessageTypes.APPLICATION
					&& retransmitterBean.getMessageNumber()==lastAckedMsg+1){
					
					if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
						log.debug("adding message for reallocate: " + retransmitterBean.getMessageNumber());
					
					//try to reallocate application msgs that are next in the outgoing list to 
					msgsToReallocate.add(storageManager.retrieveMessageContext(messageStoreKey, storageManager.getContext()));
					retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
					storageManager.removeMessageContext(messageStoreKey);	
					lastAckedMsg++;
				}
				else if(reallocateIfPossible){
					//we are reallocating but this message does not fit the criteria. We should not delete it
					if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
						log.debug("cannot reallocate: " + retransmitterBean.getMessageNumber());
					if(msgsToReallocate.size()==0){
						try{
							//however we might need this message if there are no messages to reallocate but we still
							//need a new sequence - we use a dummy message
							MessageContext dummy = SandeshaUtil.cloneMessageContext(
									storageManager.retrieveMessageContext(messageStoreKey, storageManager.getContext()));
							dummy.getOptions().setProperty(SandeshaClientConstants.DUMMY_MESSAGE, Sandesha2Constants.VALUE_TRUE);	
							msgsToReallocate.add(dummy);							
						}
						catch(Exception e){
							if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
								log.debug("Exit: TerminateManager::cleanSendingSideData " + e);
							throw new SandeshaStorageException(e);
						}
					}
				}
				else{
					//we are not reallocating so just delete the messages
					retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
					storageManager.removeMessageContext(messageStoreKey);						
				}
			}
		}
		
		if(reallocateIfPossible){
			try{
				SandeshaUtil.reallocateMessagesToNewSequence(storageManager, rmsBean, msgsToReallocate, transaction);	
				reallocatedOK = true;
			
				//If the reallocation was successful and the RMSBean being reallocated was originally created for reallocation
				//the RMSBean can be deleted.
				transaction = storageManager.getTransaction();
				if(rmsBean.isReallocated() == Sandesha2Constants.WSRM_COMMON.ORIGINAL_REALLOCATED_BEAN_COMPLETE){
					rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.NOT_REALLOCATED);
					storageManager.getRMSBeanMgr().update(rmsBean);
				}
				
				if(transaction != null && transaction.isActive()) transaction.commit();
				transaction = null;
			} catch(Exception e){
				
				if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
					log.warn(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.reallocationFailed, rmsBean.getSequenceID()));
				
				if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
					log.debug("Reallocation failed for messages on sequence " + rmsBean.getSequenceID() + " because of " + e);
			
				//Reallocation Failed
				//Need to mark any RMSBeans involved as failed so that we don't attempt to send
				//anymore messages on these seq's.  The client will have to manually reallocate and
				//administer the sequences.
				transaction = storageManager.getTransaction();
				
				rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED);
				storageManager.getRMSBeanMgr().update(rmsBean);
				
				String intSeqIDOfOriginallyReallocatedSeq = rmsBean.getInternalSeqIDOfSeqUsedForReallocation();
				if(intSeqIDOfOriginallyReallocatedSeq != null){
					RMSBean origRMSBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, intSeqIDOfOriginallyReallocatedSeq);
					origRMSBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED);
					storageManager.getRMSBeanMgr().update(origRMSBean);
				}
				
				 if(transaction != null && transaction.isActive()) transaction.commit();
					transaction = null;
				
			} finally {
				if (transaction != null && transaction.isActive()) {
					transaction.rollback();
				}
			}		
		}
		
		if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
			log.debug("Exit: TerminateManager::cleanSendingSideData " + reallocatedOK);
		return reallocatedOK;
	}