in modules/core/src/main/java/org/apache/sandesha2/msgprocessors/ApplicationMsgProcessor.java [727:833]
private void processResponseMessage(RMMsgContext rmMsg, RMSBean rmsBean, String internalSequenceId, String outSequenceID, long messageNumber,
String storageKey, StorageManager storageManager, Transaction tran, boolean hasUserTransaction) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: ApplicationMsgProcessor::processResponseMessage, " + internalSequenceId + ", " + outSequenceID);
MessageContext msg = rmMsg.getMessageContext();
SenderBeanMgr retransmitterMgr = storageManager.getSenderBeanMgr();
// setting last message
boolean lastMessage = false;
if (msg.isServerSide()) {
Boolean inboundLast = (Boolean) msg.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_LAST_MESSAGE);
if (inboundLast != null && inboundLast.booleanValue()) {
lastMessage = true;
}
} else {
// client side
Object obj = msg.getProperty(SandeshaClientConstants.LAST_MESSAGE);
if (obj != null && "true".equals(obj)) {
lastMessage = true;
}
}
boolean sendingNow = false;
if(outSequenceID != null && !hasUserTransaction) {
sendingNow = true;
}
// Now that we have decided which sequence to use for the message, make sure that we secure
// it with the correct token.
RMMsgCreator.secureOutboundMessage(rmsBean, msg);
// Retransmitter bean entry for the application message
SenderBean appMsgEntry = new SenderBean();
appMsgEntry.setMessageContextRefKey(storageKey);
appMsgEntry.setTimeToSend(System.currentTimeMillis());
appMsgEntry.setMessageID(rmMsg.getMessageId());
appMsgEntry.setMessageNumber(messageNumber);
appMsgEntry.setLastMessage(lastMessage);
SOAPEnvelope envelope = rmMsg.getSOAPEnvelope();
if (lastMessage && envelope!=null && envelope.getBody().getFirstOMChild()==null)
appMsgEntry.setMessageType(Sandesha2Constants.MessageTypes.LAST_MESSAGE);
else
appMsgEntry.setMessageType(Sandesha2Constants.MessageTypes.APPLICATION);
appMsgEntry.setInboundSequenceId(inboundSequence);
appMsgEntry.setInboundMessageNumber(inboundMessageNumber);
if (outSequenceID == null) {
appMsgEntry.setSend(false);
} else {
appMsgEntry.setSend(true);
// Send will be set to true at the sender.
msg.setProperty(Sandesha2Constants.SET_SEND_TO_TRUE, Sandesha2Constants.VALUE_TRUE);
appMsgEntry.setSequenceID(outSequenceID);
}
EndpointReference to = rmMsg.getTo();
if (to!=null)
appMsgEntry.setToAddress(to.getAddress());
appMsgEntry.setInternalSequenceID(internalSequenceId);
msg.setProperty(Sandesha2Constants.QUALIFIED_FOR_SENDING, Sandesha2Constants.VALUE_FALSE);
// increasing the current handler index, so that the message will not be
// going throught the SandeshaOutHandler again.
msg.setCurrentHandlerIndex(msg.getCurrentHandlerIndex() + 1);
SandeshaUtil.executeAndStore(rmMsg, storageKey, storageManager);
// Insert the SenderBean
retransmitterMgr.insert(appMsgEntry);
// Lock the sender bean before we insert it, if we are planning to send it ourselves
SenderWorker worker = null;
if(sendingNow) {
String workId = appMsgEntry.getMessageID() + appMsgEntry.getTimeToSend();
SandeshaThread sender = storageManager.getSender();
ConfigurationContext context = msg.getConfigurationContext();
WorkerLock lock = sender.getWorkerLock();
worker = new SenderWorker(context, appMsgEntry, rmsBean.getRMVersion());
worker.setLock(lock);
worker.setWorkId(workId);
// Actually take the lock
lock.addWork(workId, worker);
}
// Commit the transaction, so that the sender worker starts with a clean slate.
if(appMsgProcTran != null && appMsgProcTran.isActive()) appMsgProcTran.commit();
if(worker != null) {
try {
worker.run();
} catch(Exception e) {
log.error("Caught exception running SandeshaWorker", e);
}
}
if (log.isDebugEnabled())
log.debug("Exit: ApplicationMsgProcessor::processResponseMessage");
}