in modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaOutHandler.java [55:183]
public InvocationResponse invoke(MessageContext msgCtx) throws AxisFault {
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Enter: SandeshaOutHandler::invoke, " + msgCtx.getEnvelope().getHeader());
InvocationResponse returnValue = InvocationResponse.CONTINUE;
ConfigurationContext context = msgCtx.getConfigurationContext();
if (context == null) {
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet);
log.debug(message);
throw new AxisFault(message);
}
AxisService axisService = msgCtx.getAxisService();
if (axisService == null) {
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.axisServiceIsNull);
log.debug(message);
throw new AxisFault(message);
}
//see if this message is unreliable i.e. WSRM not requried
if(SandeshaUtil.isMessageUnreliable(msgCtx)) {
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Exit: SandeshaOutHandler::invoke, Skipping sandesha processing for unreliable message " + returnValue);
return returnValue;
}
// Also do not apply RM to fault messages
{
if(msgCtx.isProcessingFault()) {
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Exit: SandeshaOutHandler::invoke, Skipping sandesha processing for fault message " + returnValue);
return returnValue ;
}
}
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(context, context.getAxisConfiguration());
//this will change the execution chain of this message to work correctly in retransmissions.
//For e.g. Phases like security will be removed to be called in each retransmission.
SandeshaUtil.modifyExecutionChainForStoring(msgCtx, storageManager);
String DONE = (String) msgCtx.getProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE);
if (null != DONE && "true".equals(DONE)) {
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Exit: SandeshaOutHandler::invoke, Application processing done " + returnValue);
return returnValue;
}
msgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
Transaction transaction = null;
try {
transaction = storageManager.getTransaction();
// getting rm message
RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
MsgProcessor msgProcessor = null;
int messageType = rmMsgCtx.getMessageType();
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Message Type: " + messageType);
if (messageType == Sandesha2Constants.MessageTypes.UNKNOWN) {
if (msgCtx.isServerSide()) {
String inboundSequence = (String) msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_SEQUENCE_ID);
Long msgNum = (Long) msgCtx.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_MESSAGE_NUMBER);
if (inboundSequence != null && msgNum != null) {
msgProcessor = new ApplicationMsgProcessor(inboundSequence, msgNum.longValue());
}
} else // if client side.
msgProcessor = new ApplicationMsgProcessor();
} else {
msgProcessor = MsgProcessorFactory.getMessageProcessor(rmMsgCtx);
}
if (msgProcessor != null){
if(msgProcessor.processOutMessage(rmMsgCtx, transaction)){
//the msg was paused
returnValue = InvocationResponse.SUSPEND;
}
} else if (messageType==Sandesha2Constants.MessageTypes.ACK_REQUEST) {
AckRequestedProcessor ackRequestedProcessor = new AckRequestedProcessor ();
if(ackRequestedProcessor.processOutgoingAckRequestMessage (rmMsgCtx)){
//the msg was paused
returnValue = InvocationResponse.SUSPEND;
}
}
//we need the incoming thread to wait when suspending.
//Hence adding the boolean property.
//Should be done only to the server side
OperationContext opCtx = msgCtx.getOperationContext();
if(msgCtx.isServerSide() && opCtx != null && returnValue == InvocationResponse.SUSPEND) {
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug("Setting HOLD_RESPONSE property");
opCtx.setProperty(RequestResponseTransport.HOLD_RESPONSE, Boolean.TRUE);
}
if (transaction != null && transaction.isActive()) transaction.commit();
transaction = null;
} catch (Exception e) {
// message should not be sent in a exception situation.
msgCtx.pause();
returnValue = InvocationResponse.SUSPEND;
// Rethrow the original exception if it is an AxisFault
if (e instanceof AxisFault)
throw (AxisFault)e;
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.outMsgError, e.toString());
throw new AxisFault(message, e);
} finally {
// roll back the transaction
if (transaction != null && transaction.isActive()) {
try {
transaction.rollback();
} catch (Exception e1) {
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1.toString());
log.debug(message, e1);
}
}
}
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Exit: SandeshaOutHandler::invoke " + returnValue);
return returnValue;
}