private void checkForOrphanMessages()

in modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java [570:682]


	private void checkForOrphanMessages(StorageManager manager) throws SandeshaStorageException {
		if (log.isDebugEnabled())
			log.debug("Enter: Sender::checkForOrphanMessages");

		Transaction tran = null;
		try {
			tran = manager.getTransaction();

			// This finder will look for beans that should have been sent, but
			// could not be sent
			// because they need a MakeConnection message to come in to pick it
			// up. We also factor
			// in TRANSPORT_WAIT_TIME to give the MakeConnection a chance to
			// arrive.
			SenderBean finder = new SenderBean();
			finder.setSend(true);
			finder.setTransportAvailable(false);
			finder.setTimeToSend(System.currentTimeMillis() - Sandesha2Constants.TRANSPORT_WAIT_TIME);

			List<SenderBean> beans = manager.getSenderBeanMgr().find(finder);
			
			//Commit this transaction
			tran.commit();
			
			// Create a new transaction
			tran = manager.getTransaction();

			Iterator<SenderBean> beanIter = beans.iterator();
			while (beanIter.hasNext()) {
				SenderBean bean = (SenderBean) beanIter.next();

				// Emit a message to warn the user that MakeConnections are not
				// arriving to pick
				// messages up
				if (log.isWarnEnabled()) {
					String message = null;
					String internalSequenceID = bean.getInternalSequenceID();
					String sequenceID = bean.getSequenceID();
				
					if(!warnedAlreadyOrphans.containsKey(sequenceID)){ // we only want to do log.warn once per orphaned sequenceId
					
						if (bean.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION)					
							message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPolling, sequenceID, internalSequenceID);				
						else
						{
							String messageType = Integer.toString(bean.getMessageType());
							message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPollingProtocol, messageType, sequenceID, internalSequenceID);
						}
						warnedAlreadyOrphans.put(sequenceID, System.currentTimeMillis());
						log.warn(message);
 					}
 				}
				
				// If client shuts down too quickly, termination messages get orphaned and this has an impact on performance.  
				// Will delete these once they have been recognised as orphans.
				int messageType = bean.getMessageType();
				if(MessageTypes.TERMINATE_SEQ ==  messageType || MessageTypes.TERMINATE_SEQ_RESPONSE ==  messageType){
					String id = bean.getSequenceID(); // get this again as it is an error case
					
					// Mark the sequence as terminated
					RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(manager, id);
					TerminateManager.terminateSendingSide(rmsBean, storageManager, false, null);
					
					if(log.isDebugEnabled()) log.debug("Sender::checkForOrphanMessages.  Orphaned message of type TERMINATE_SEQ or TERMINATE_SEQ_RESPONSE found.  Deleting this message with a sequence ID of : " + id);
					// Delete the terminate sender bean.
					manager.getSenderBeanMgr().delete(bean.getMessageID());		
					
				} else {					
					// Update the bean so that we won't emit another message for another TRANSPORT_WAIT_TIME
					bean.setTimeToSend(System.currentTimeMillis());
					manager.getSenderBeanMgr().update(bean);
				}
			
				// clean up warnedAlreadyOrphans list when it gets big - currently over a thousand entries, or every10 minutes
				// delete everything over an hour old
				long currentTime = System.currentTimeMillis();
				if(lastRanCleanup == 0){
					lastRanCleanup = System.currentTimeMillis();
				}
				if( warnedAlreadyOrphans.size() > 1000 || currentTime > (lastRanCleanup + 600000)){
					if(log.isDebugEnabled()) log.debug("Sender::checkForOrphanMessages.  Cleaning up list of orphans");
					long timeAnHourAgo = currentTime - 3600000; 
					Iterator<String> it = warnedAlreadyOrphans.keySet().iterator();
					while(it.hasNext()){
						Object key = it.next();
						long ageOfThisOrphan = ((Long)warnedAlreadyOrphans.get(key)).longValue();
						if (ageOfThisOrphan < timeAnHourAgo) {
							warnedAlreadyOrphans.remove(key);
						}
						
					}
					lastRanCleanup = System.currentTimeMillis();
				}

			}

			if (tran != null && tran.isActive())
				tran.commit();
			tran = null;

		} catch (Exception e) {
			// There isn't much we can do here, so log the exception and
			// continue.
			if (log.isDebugEnabled())
				log.debug("Exception", e);
		} finally {
			if (tran != null && tran.isActive())
				tran.rollback();
		}

		if (log.isDebugEnabled())
			log.debug("Exit: Sender::checkForOrphanMessages");
	}