in modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java [207:383]
private boolean invokeMessage(Transaction tran) {
if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::invokeMessage");
Transaction transaction = null;
MessageContext msgToInvoke = null;
boolean messageInvoked = true;
// If we are not the holder of the correct lock, then we have to stop
if(lock != null && (!lock.ownsLock(workId, this))) {
if (log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run, another worker holds the lock");
return false;
}
try {
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
InvokerBeanMgr invokerBeanMgr = storageManager.getInvokerBeanMgr();
//starting a transaction
if(tran == null) {
transaction = storageManager.getTransaction();
} else {
transaction = tran;
}
InvokerBean invokerBean = invokerBeanMgr.retrieve(messageContextKey);
msgToInvoke = storageManager.retrieveMessageContext(messageContextKey, configurationContext);
if(msgToInvoke==null){
//return since there is nothing to do
if(log.isDebugEnabled()) log.debug("null msg");
return false;
}
// ending the transaction before invocation.
if(transaction != null) {
transaction.commit();
transaction = storageManager.getTransaction();
}
RMMsgContext rmMsg = MsgInitializer.initializeMessage(msgToInvoke);
// Lock the RMD Bean just to avoid deadlocks
RMDBean rMDBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, invokerBean.getSequenceID());
boolean highestMessage = false;
if(!ignoreNextMsg){
// updating the next msg to invoke
long nextMsgNo = rMDBean.getNextMsgNoToProcess();
if (!(invokerBean.getMsgNo()==nextMsgNo)) {
//someone else has invoked this before us - this run should now stop
if(log.isDebugEnabled()) log.debug("Operated message number is different from the Next Message Number to invoke");
return false;
}
nextMsgNo++;
rMDBean.setNextMsgNoToProcess(nextMsgNo);
storageManager.getRMDBeanMgr().update(rMDBean);
}
// Check if this is the last message
if (rmMsg.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
Sequence sequence = rmMsg.getSequence();
if (sequence.getLastMessage()) {
//this will work for RM 1.0 only
highestMessage = true;
} else {
if (rMDBean!=null && rMDBean.isTerminated()) {
long highestInMsgNo = rMDBean.getHighestInMessageNumber();
if (invokerBean.getMsgNo()==highestInMsgNo)
highestMessage = true;
}
}
}
// Depending on the transaction support, the service will be invoked only once.
// Therefore we delete the invoker bean and message now, ahead of time
invokerBeanMgr.delete(messageContextKey);
// removing the corresponding message context as well.
storageManager.removeMessageContext(messageContextKey);
try {
boolean postFailureInvocation = false;
// StorageManagers should st following property to
// true, to indicate that the message received comes
// after a failure.
String postFaulureProperty = (String) msgToInvoke
.getProperty(Sandesha2Constants.POST_FAILURE_MESSAGE);
if (postFaulureProperty != null
&& Sandesha2Constants.VALUE_TRUE.equals(postFaulureProperty))
postFailureInvocation = true;
InvocationResponse response = null;
if (postFailureInvocation) {
makeMessageReadyForReinjection(msgToInvoke);
if (log.isDebugEnabled())
log.debug("Receiving message, key=" + messageContextKey + ", msgCtx="
+ msgToInvoke.getEnvelope().getHeader());
response = AxisEngine.receive(msgToInvoke);
} else {
if (log.isDebugEnabled())
log.debug("Resuming message, key=" + messageContextKey + ", msgCtx="
+ msgToInvoke.getEnvelope().getHeader());
msgToInvoke.setPaused(false);
response = AxisEngine.resumeReceive(msgToInvoke);
}
if(!InvocationResponse.SUSPEND.equals(response)) {
// Performance work - need to close the XMLStreamReader to prevent GC thrashing.
SOAPEnvelope env = msgToInvoke.getEnvelope();
if(env!=null){
StAXBuilder sb = (StAXBuilder)msgToInvoke.getEnvelope().getBuilder();
if(sb!=null){
sb.close();
}
}
}
if (transaction != null && transaction.isActive()) {
transaction.commit();
transaction = storageManager.getTransaction();
}
if (highestMessage) {
//do cleaning stuff that hs to be done after the invocation of the last message.
TerminateManager.cleanReceivingSideAfterInvocation(invokerBean.getSequenceID(), storageManager);
// exit from current iteration. (since an entry
// was removed)
if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::invokeMessage Last message return " + messageInvoked);
return messageInvoked;
}
} catch (SandeshaStorageTransientException e){
if (log.isDebugEnabled())
log.debug("SandeshaStorageTransientException :", e);
if (transaction != null && transaction.isActive())
transaction.rollback();
messageInvoked = false;
} catch (Exception e) {
if (log.isDebugEnabled())
log.debug("Exception :", e);
if (transaction != null && transaction.isActive())
transaction.rollback();
messageInvoked = false;
handleFault(rmMsg, e);
}
if(transaction != null && transaction.isActive()) transaction.commit();
transaction = null;
} catch (Exception e) {
if (log.isErrorEnabled())
log.error(e.toString(), e);
messageInvoked = false;
} finally {
if (transaction!=null && transaction.isActive()) {
try {
transaction.rollback();
} catch (SandeshaStorageException e) {
if (log.isWarnEnabled())
log.warn("Caught exception rolling back transaction", e);
}
}
}
if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::invokeMessage " + messageInvoked);
return messageInvoked;
}