in modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java [101:205]
public void run() {
if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::run, message " + messageNumber + ", sequence " + sequence);
Transaction tran = null;
try {
InvokerWorker nextWorker = null;
Runnable nextRunnable = null;
// Invoke the first message
lastMessageInvoked = invokeMessage(null);
// Look for the next message, so long as we are still processing normally
while(!ignoreNextMsg && lastMessageInvoked) {
if(log.isDebugEnabled()) log.debug("InvokerWorker:: looking for next msg to invoke");
InvokerBean finder = new InvokerBean();
finder.setSequenceID(sequence);
finder.setMsgNo(messageNumber + 1);
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
tran = storageManager.getTransaction();
InvokerBeanMgr mgr = storageManager.getInvokerBeanMgr();
InvokerBean nextBean = mgr.findUnique(finder);
if(nextBean != null) {
if(pooledThread) {
if(log.isDebugEnabled()) log.debug("InvokerWorker:: pooledThread");
initializeFromBean(nextBean);
final Transaction theTran = tran;
Runnable work = new Runnable() {
public void run() {
lastMessageInvoked = invokeMessage(theTran);
}
};
// Wrap the work with the correct context, if needed.
ContextManager contextMgr = SandeshaUtil.getContextManager(configurationContext);
if(contextMgr != null) {
work = contextMgr.wrapWithContext(work, nextBean.getContext());
}
// Finally do the work
work.run();
tran = null;
} else {
if(log.isDebugEnabled()) log.debug("InvokerWorker:: not pooled thread");
nextWorker = new InvokerWorker(configurationContext, nextBean);
nextWorker.setPooled();
nextWorker.setWorkId(workId);
// Wrap the invoker worker with the correct context, if needed.
ContextManager contextMgr = SandeshaUtil.getContextManager(configurationContext);
if(contextMgr != null) {
nextRunnable = contextMgr.wrapWithContext(nextWorker, nextBean.getContext());
} else {
nextRunnable = nextWorker;
}
}
}
if(nextBean == null || nextWorker != null) {
//Remove the lock before we unlock the invokerBean
lock.removeWork(workId);
// Clean up the tran, in case we didn't pass it into the invoke method
if(tran != null) tran.commit();
tran = null;
break;
}
// Clean up the tran, in case we didn't pass it into the invoke method
if(tran != null) tran.commit();
tran = null;
}//end while
// If we created another worker, set it running now that we have released the lock
if(nextWorker != null) {
lock.addWork(workId, nextWorker);
configurationContext.getThreadPool().execute(nextRunnable);
}
} catch(SandeshaException e) {
log.debug("Exception within InvokerWorker", e);
// Clean up the tran, if there is one left
if(tran != null) {
try {
tran.rollback();
} catch(SandeshaException e2) {
log.debug("Exception rolling back tran", e2);
}
}
} finally {
// Release the lock
if (workId !=null && lock!=null && lock.ownsLock(workId, this)) {
lock.removeWork(workId);
}
}
if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run");
}