in modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java [160:274]
private static void processApplicationMessage(RMMsgContext rmMsgCtx) throws AxisFault {
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Enter: SandeshaGlobalInHandler::processApplicationMessage");
// Check if this is a duplicate message
Sequence sequence = rmMsgCtx.getSequence();
String sequenceId = sequence.getIdentifier().getIdentifier();
long msgNo = sequence.getMessageNumber();
StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(rmMsgCtx.getConfigurationContext(),
rmMsgCtx.getConfigurationContext().getAxisConfiguration());
Transaction transaction = null;
try {
transaction = storageManager.getTransaction();
// Check that both the Sequence header and message body have been secured properly
RMDBeanMgr mgr = storageManager.getRMDBeanMgr();
RMDBean bean = mgr.retrieve(sequenceId);
MessageContext messageContext = rmMsgCtx.getMessageContext();
if(bean != null){
//first check the security credentials of the msg is necessary
SandeshaUtil.assertProofOfPossession(bean, messageContext, messageContext.getEnvelope().getBody());
SandeshaUtil.assertProofOfPossession(bean, messageContext,
messageContext.getEnvelope().getHeader().getFirstChildWithName(new QName(rmMsgCtx.getRMNamespaceValue(),
Sandesha2Constants.WSRM_COMMON.SEQUENCE)));
if (msgNo == 0) {
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidMsgNumber, Long
.toString(msgNo));
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug(message);
throw new SandeshaException(message);
}
// Get the server completed message ranges list
RangeString serverCompletedMessageRanges = bean.getServerCompletedMessages();
// See if the message is in the list of completed ranges
boolean msgNoPresentInList =
serverCompletedMessageRanges.isMessageNumberInRanges(msgNo);
if((rmMsgCtx.getMessageType()==Sandesha2Constants.MessageTypes.LAST_MESSAGE) && bean.getOutboundInternalSequence()!=null){
rmMsgCtx.setProperty(Constants.Configuration.DISABLE_RESPONSE_ACK, Boolean.TRUE);
}
if (msgNoPresentInList){
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Detected duplicate message " + msgNo);
boolean isDuplicate = true;
//still allow this msg if we have no corresponding invoker bean for it and we are inOrder
if(SandeshaUtil.isInOrder(rmMsgCtx.getMessageContext()))
{
InvokerBean finderBean = new InvokerBean();
finderBean.setMsgNo(msgNo);
finderBean.setSequenceID(sequenceId);
List<InvokerBean> invokerBeanList = storageManager.getInvokerBeanMgr().find(finderBean);
if((invokerBeanList==null || invokerBeanList.size()==0)
&& bean.getNextMsgNoToProcess()<=msgNo){
isDuplicate = false;
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Allowing completed message on sequence " + sequenceId + ", msgNo " + msgNo);
}
}
if(isDuplicate){
// Add the duplicate RM AxisOperation to the message
//If the service has not been found by this time, we cannot proceed.
AxisService service = rmMsgCtx.getMessageContext().getAxisService();
if (service==null)
throw new SandeshaException ("Duplicate message detected. But cant dispatch since the Service has not been found");
setupDuplicateOperation(rmMsgCtx);
}
}
} else {
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Detected message for no sequence " + msgNo);
messageContext.setRelationships(null);
// Add the duplicate RM AxisOperation to the message
//If the service has not been found by this time, we cannot proceed.
AxisService service = rmMsgCtx.getMessageContext().getAxisService();
if (service==null)
throw new SandeshaException ("Duplicate message detected. But cant dispatch since the Service has not been found");
setupDuplicateOperation(rmMsgCtx);
}
if(transaction != null && transaction.isActive()) transaction.commit();
transaction = null;
}catch (RuntimeException e)
{
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Exit: SandeshaGlobalInHandler::processApplicationMessage", e);
throw e;
}
finally {
if (transaction != null && transaction.isActive())
transaction.rollback();
}
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Exit: SandeshaGlobalInHandler::processApplicationMessage");
}