in modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java [209:374]
protected boolean internalRun() {
if (log.isDebugEnabled()) log.debug("Enter: Invoker::internalRun");
boolean sleep = false;
Transaction transaction = null;
try {
RMDBeanMgr nextMsgMgr = storageManager.getRMDBeanMgr();
InvokerBeanMgr storageMapMgr = storageManager
.getInvokerBeanMgr();
transaction = storageManager.getTransaction();
// Pick a sequence using a round-robin approach
ArrayList<SequenceEntry> allSequencesList = getSequences();
int size = allSequencesList.size();
log.debug("Choosing one from " + size + " sequences");
if(nextIndex >= size) {
nextIndex = 0;
// We just looped over the set of sequences. If we didn't process any
// messages on this loop then we sleep before the next one
if(size == 0 || !processedMessage) {
sleep = true;
}
processedMessage = false;
if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, looped over all sequences, sleep " + sleep);
if(transaction != null && transaction.isActive()) transaction.commit();
transaction = null;
return sleep;
}
SequenceEntry entry = (SequenceEntry) allSequencesList.get(nextIndex++);
String sequenceId = entry.getSequenceId();
log.debug("Chose sequence " + sequenceId);
RMDBean nextMsgBean = nextMsgMgr.retrieve(sequenceId);
if (nextMsgBean == null) {
log.debug("Next message not set correctly. Removing invalid entry.");
stopThreadForSequence(sequenceId, entry.isRmSource());
allSequencesList = getSequences();
if (allSequencesList.size() == 0)
sleep = true;
if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, sleep " + sleep);
if(transaction != null && transaction.isActive()) transaction.commit();
transaction = null;
return sleep;
}
long nextMsgno = nextMsgBean.getNextMsgNoToProcess();
if (nextMsgno <= 0) {
// Make sure we sleep on the next loop, so that we don't spin in a tight loop
sleep = true;
if (log.isDebugEnabled())
log.debug("Invalid Next Message Number " + nextMsgno);
String message = SandeshaMessageHelper.getMessage(
SandeshaMessageKeys.invalidMsgNumber, Long
.toString(nextMsgno));
throw new SandeshaException(message);
}
InvokerBean selector = new InvokerBean();
selector.setSequenceID(sequenceId);
selector.setMsgNo(nextMsgno);
List<InvokerBean> invokerBeans = storageMapMgr.find(selector);
//add any msgs that belong to out of order windows
addOutOfOrderInvokerBeansToList(sequenceId,
storageManager, invokerBeans);
// If there aren't any beans to process then move on to the next sequence
if (invokerBeans.size() == 0) {
if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, no beans to invoke on sequence " + sequenceId + ", sleep " + sleep);
if(transaction != null && transaction.isActive()) transaction.commit();
transaction = null;
return sleep;
}
Iterator<InvokerBean> stMapIt = invokerBeans.iterator();
//TODO correct the locking mechanism to have one lock per sequence.
//TODO should this be a while, not an if?
if (stMapIt.hasNext()) { //some invokation work is present
InvokerBean bean = (InvokerBean) stMapIt.next();
//see if this is an out of order msg
boolean beanIsOutOfOrderMsg = bean.getMsgNo()!=nextMsgno;
String workId = sequenceId;
//check whether the bean is already assigned to a worker.
if (getWorkerLock().isWorkPresent(workId)) {
// As there is already a worker assigned we are probably dispatching
// messages too quickly, so we sleep before trying the next sequence.
sleep = true;
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned, workId);
if (log.isDebugEnabled()) log.debug("Exit: Invoker::internalRun, " + message + ", sleep " + sleep);
if(transaction != null) {
transaction.commit();
transaction = null;
}
return sleep;
}
if(transaction != null) {
transaction.commit();
transaction = null;
}
// start a new worker thread and let it do the invocation.
InvokerWorker worker = new InvokerWorker(context, bean);
if(beanIsOutOfOrderMsg) worker.forceOutOfOrder();
worker.setPooled();
worker.setWorkId(workId);
// Wrap the invoker worker with the correct context, if needed.
Runnable work = worker;
ContextManager contextMgr = SandeshaUtil.getContextManager(context);
if(contextMgr != null) {
work = contextMgr.wrapWithContext(work, bean.getContext());
}
try {
// Set the lock up before we start the thread, but roll it back
// if we hit any problems
if(worker.getLock().addWork(workId, worker))
threadPool.execute(work);
} catch(Exception e) {
worker.getLock().removeWork(workId);
}
processedMessage = true;
}
if(transaction != null && transaction.isActive()) transaction.commit();
transaction = null;
} catch (Exception e) {
String message = SandeshaMessageHelper
.getMessage(SandeshaMessageKeys.invokeMsgError);
if(log.isDebugEnabled()) log.debug(message, e);
} finally {
if (transaction != null && transaction.isActive()) {
try {
transaction.rollback();
} catch (Exception e) {
String message = SandeshaMessageHelper.getMessage(
SandeshaMessageKeys.rollbackError, e.toString());
if(log.isDebugEnabled()) log.debug(message, e);
}
}
}
if (log.isDebugEnabled())
log.debug("Exit: InOrderInvoker::internalRun");
return sleep;
}