protected boolean internalRun()

in modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java [103:316]


	protected boolean internalRun() {
		if (log.isDebugEnabled())
			log.debug("Enter: Sender::internalRun");

		Transaction transaction = null;
		boolean sleep = false;

		try {
			// Pick a sequence using a round-robin approach
			ArrayList<SequenceEntry> allSequencesList = getSequences();
			int size = allSequencesList.size();

			if (log.isDebugEnabled())
				log.debug("Choosing one from " + size + " sequences");
			if (nextIndex >= size) {
				nextIndex = 0;

				// We just looped over the set of sequences. If we didn't
				// process any
				// messages on this loop then we sleep before the next one
				if (size == 0 || !processedMessage) {
					sleep = true;
				}
				processedMessage = false;

				if (System.currentTimeMillis() - lastHousekeeping > HOUSEKEEPING_INTERVAL) {
					// At this point - delete any sequences that have timed out,
					// or been terminated.
					deleteTerminatedSequences(storageManager);

					// Also clean up and sender beans that are not yet eligible
					// for sending, but
					// are blocking the transport threads.
					unblockTransportThreads(storageManager);

					// Finally, check for messages that can only be serviced by
					// polling, and warn
					// the user if they are too old
					checkForOrphanMessages(storageManager);
					lastHousekeeping = System.currentTimeMillis();
				}
				if (log.isDebugEnabled())
					log.debug("Exit: Sender::internalRun, looped over all sequences, sleep " + sleep);
				return sleep;
			}

			transaction = storageManager.getTransaction();

			SequenceEntry entry = (SequenceEntry) allSequencesList.get(nextIndex++);
			String sequenceId = entry.getSequenceId();
			if (log.isDebugEnabled())
				log.debug("Chose sequence " + sequenceId);

			String rmVersion = null;
			// Check that the sequence is still valid
			boolean found = false;
			if (entry.isRmSource()) {
				RMSBean rms = null;
				if (entry.rmsKey == null) {
					RMSBean matcher = new RMSBean();
					matcher.setInternalSequenceID(sequenceId);
					matcher.setTerminated(false);
					rms = storageManager.getRMSBeanMgr().findUnique(matcher);
					if (rms != null) {
						entry.rmsKey = rms.getCreateSeqMsgID();
					}
				} else {
					rms = storageManager.getRMSBeanMgr().retrieve(entry.rmsKey);
					if(rms==null)
					{
						if (log.isDebugEnabled())
							log.debug("RMS bean is null - checking using findUnique");
					    RMSBean matcher = new RMSBean();
						matcher.setInternalSequenceID(sequenceId);
						matcher.setTerminated(false);
						rms = storageManager.getRMSBeanMgr().findUnique(matcher);
					}
				}
				if (rms != null && !rms.isTerminated() && !rms.isTimedOut()) {
					sequenceId = rms.getSequenceID();
					if (SequenceManager.hasSequenceTimedOut(rms, sequenceId, storageManager))
						SequenceManager.finalizeTimedOutSequence(rms.getInternalSequenceID(), null, storageManager);
					else
						found = true;
					rmVersion = rms.getRMVersion();
				}

			} else {
				RMDBean rmd = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
				if (rmd != null) {
					if (!rmd.isTerminated()) {
						found = true;
						rmVersion = rmd.getRMVersion();
					}

				}
			}
			if (!found) {
				stopThreadForSequence(sequenceId, entry.isRmSource());
				if (log.isDebugEnabled())
					log.debug("Exit: Sender::internalRun, sequence has ended");

				if (transaction != null && transaction.isActive()) {
					transaction.commit();
					transaction = null;
				}

				return false;
			}
			if (sequenceId != null) {
				AckHolder acktts = (AckHolder) ackMap.get(sequenceId);
				if (acktts != null && acktts.tts < System.currentTimeMillis()) {
					ackMap.remove(sequenceId);
					RMDBean rmd = storageManager.getRMDBeanMgr().retrieve(sequenceId);
					if (rmd != null) {
						RMMsgContext ackRMMsgContext = AcknowledgementManager.generateAckMessage(acktts.refMsg, rmd, sequenceId, storageManager, true);

						AcknowledgementManager.addAckBeanEntry(ackRMMsgContext, sequenceId, acktts.tts, storageManager);
						transaction.commit();
						transaction = storageManager.getTransaction();
					}

				}
			}
			SenderBeanMgr mgr = storageManager.getSenderBeanMgr();
			SenderBean senderBean = mgr.getNextMsgToSend(sequenceId);

			if (senderBean == null) {
				if (log.isDebugEnabled())
					log.debug("Exit: Sender::internalRun, no message for this sequence");

				if (transaction != null && transaction.isActive()) {
					transaction.commit();
					transaction = null;
				}

				return false; // Move on to the next sequence in the list
			}

			// work Id is used to define the piece of work that will be
			// assigned to the Worker thread,
			// to handle this Sender bean.

			// workId contains a timeTiSend part to cater for retransmissions.
			// This will cause retransmissions to be treated as new work.
			String workId = senderBean.getMessageID() + senderBean.getTimeToSend();

			// check weather the bean is already assigned to a worker.
			if (getWorkerLock().isWorkPresent(workId)) {
				// As there is already a worker running we are probably looping
				// too fast, so sleep on the next loop.
				if (log.isDebugEnabled()) {
					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned, workId);
					log.debug("Exit: Sender::internalRun, " + message + ", sleeping");
				}

				if (transaction != null && transaction.isActive()) {
					transaction.commit();
					transaction = null;
				}

				return true;
			}

			// commiting the transaction here to release resources early.
			if (transaction != null && transaction.isActive())
				transaction.commit();
			transaction = null;

			// start a worker which will work on this messages.
			SenderWorker worker = new SenderWorker(context, senderBean, rmVersion);
			worker.setLock(getWorkerLock());
			worker.setWorkId(workId);

			try {
				// Set the lock up before we start the thread, but roll it back
				// if we hit any problems
				getWorkerLock().addWork(workId, worker);
				threadPool.execute(worker);
			} catch (Exception e) {
				getWorkerLock().removeWork(workId);
			}

			// If we got to here then we found work to do on the sequence, so we
			// should
			// remember not to sleep at the end of the list of sequences.
			processedMessage = true;

		} catch (Exception e) {

			// TODO : when this is the client side throw the exception to
			// the client when necessary.

			// TODO rollback only if a SandeshaStorageException.
			// This allows the other Exceptions to be used within the Normal
			// flow.

			String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendMsgError, e.toString());
			log.debug(message, e);
		} finally {
			if (transaction != null && transaction.isActive()) {
				try {
					transaction.rollback();
					transaction = null;
				} catch (Exception e) {
					String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e.toString());
					log.debug(message, e);
				}
			}
		}
		if (log.isDebugEnabled())
			log.debug("Exit: Sender::internalRun, not sleeping");
		return false;
	}