in modules/core/src/main/java/org/apache/sandesha2/msgreceivers/RMMessageReceiver.java [54:124]
public final void invokeBusinessLogic(MessageContext msgCtx) throws AxisFault {
if(log.isDebugEnabled()) log.debug("Entry: RMMessageReceiver::invokeBusinessLogic");
RMMsgContext rmMsgCtx = null;
if (msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.RM_MESSAGE_CONTEXT) != null)
rmMsgCtx = (RMMsgContext)msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.RM_MESSAGE_CONTEXT);
else
rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
if(log.isDebugEnabled()) log.debug("MsgReceiver got type: " + SandeshaUtil.getMessageTypeString(rmMsgCtx.getMessageType()));
// Note that some messages (such as stand-alone acks) will be routed here, but
// the headers will already have been processed. Therefore we should not assume
// that we will have a MsgProcessor every time.
MsgProcessor msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
if(msgProcessor != null) {
Transaction transaction = null;
if (msgCtx.getAxisService() != null) {
Parameter unreliableParam = msgCtx.getAxisService().getParameter(SandeshaClientConstants.UNRELIABLE_MESSAGE);
if (null != unreliableParam && "true".equals(unreliableParam.getValue())) {
if (rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.CREATE_SEQ)
FaultManager.makeCreateSequenceRefusedFault(rmMsgCtx,
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.reliableMessagingNotEnabled, msgCtx.getAxisService().getName()),
new Exception(), null);
else
throw new AxisFault(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.reliableMessagingNotEnabled, msgCtx.getAxisService().getName()));
if (log.isDebugEnabled())
log.debug("Exit: RMMessageReceiver::invokeBusinessLogic, Service has disabled RM ");
return;
}
}
try {
ConfigurationContext context = msgCtx.getConfigurationContext();
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
transaction = storageManager.getTransaction();
msgProcessor.processInMessage(rmMsgCtx, transaction);
if(transaction != null && transaction.isActive()) transaction.commit();
transaction = null;
} catch (Exception e) {
if (log.isDebugEnabled())
log.debug("Exception caught during processInMessage", e);
// message should not be sent in a exception situation.
msgCtx.pause();
if (!(e instanceof AxisFault)) {
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.inMsgError, e.toString());
throw new AxisFault(message, e);
}
throw (AxisFault)e;
} finally {
if (transaction != null && transaction.isActive()) {
try {
transaction.rollback();
} catch (Exception e1) {
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
log.debug(message, e1);
}
}
}
}
if(log.isDebugEnabled()) log.debug("Exit: RMMessageReceiver::invokeBusinessLogic");
}