in modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java [261:366]
public static void replyToPoll(RMMsgContext pollMessage,
SenderBean matchingMessage,
StorageManager storageManager,
boolean pending,
String makeConnectionNamespace,
Transaction transaction)
throws AxisFault {
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Enter: MakeConnectionProcessor::replyToPoll");
TransportOutDescription transportOut = pollMessage.getMessageContext().getTransportOut();
if (transportOut == null) {
String message = SandeshaMessageHelper.getMessage(
SandeshaMessageKeys.cantSendMakeConnectionNoTransportOut);
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug(message);
throw new SandeshaException(message);
}
String messageStorageKey = matchingMessage.getMessageContextRefKey();
MessageContext returnMessage = storageManager.retrieveMessageContext(messageStorageKey, pollMessage.getConfigurationContext());
if (returnMessage == null) {
String message = "Cannot find the message stored with the key:" + messageStorageKey;
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug(message);
// Someone else has either removed the sender & message, or another make connection got here first.
return;
}
if (pending) addMessagePendingHeader(returnMessage, makeConnectionNamespace);
boolean continueSending = true;
RMMsgContext returnRMMsg = MsgInitializer.initializeMessage(returnMessage);
if (returnRMMsg.getRMNamespaceValue() == null) {
//this is the case when a stored application response msg was not sucecsfully returned
//on the sending transport's backchannel. Since the msg was stored without a sequence header
//we need to lookup the namespace using the RMS bean
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Looking up rmNamespace from RMS bean");
String sequenceID = matchingMessage.getSequenceID();
if (sequenceID != null) {
RMSBean rmsBean = new RMSBean();
rmsBean.setSequenceID(sequenceID);
rmsBean = storageManager.getRMSBeanMgr().findUnique(rmsBean);
if (rmsBean != null) {
returnRMMsg.setRMNamespaceValue(SpecSpecificConstants.getRMNamespaceValue(rmsBean.getRMVersion()));
} else {
//we will never be able to reply to this msg - at the moment the best bet is
//to not process the reply anymore
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Could not find RMS bean for polled msg");
continueSending = false;
//also remove the sender bean so that we do not select this again
storageManager.getSenderBeanMgr().delete(matchingMessage.getMessageID());
}
}
}
if (continueSending) {
// Commit the current transaction, so that the SenderWorker can do it's own locking
// this transaction should be commited out before gettting the worker lock.
// otherwise a dead lock can happen.
if (transaction != null && transaction.isActive()) transaction.commit();
SandeshaThread sender = storageManager.getSender();
WorkerLock lock = sender.getWorkerLock();
String workId = matchingMessage.getMessageID();
SenderWorker worker = new SenderWorker(pollMessage.getConfigurationContext(), matchingMessage, pollMessage.getRMSpecVersion());
worker.setLock(lock);
worker.setWorkId(workId);
while (!lock.addWork(workId, worker)) {
try {
// wait on the lock.
lock.awaitRemoval(workId);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
setTransportProperties(returnMessage, pollMessage);
// Link the response to the request
AxisOperation operation = SpecSpecificConstants.getWSRMOperation(Sandesha2Constants.MessageTypes.POLL_RESPONSE_MESSAGE, pollMessage.getRMSpecVersion(), pollMessage.getMessageContext().getAxisService());
OperationContext context = new OperationContext(operation, pollMessage.getMessageContext().getServiceContext());
context.addMessageContext(returnMessage);
returnMessage.setServiceContext(null);
returnMessage.setOperationContext(context);
returnMessage.setProperty(Sandesha2Constants.MAKE_CONNECTION_RESPONSE, Boolean.TRUE);
returnMessage.setProperty(RequestResponseTransport.TRANSPORT_CONTROL, pollMessage.getProperty(RequestResponseTransport.TRANSPORT_CONTROL));
//running the MakeConnection through a SenderWorker.
//This will allow Sandesha2 to consider both of following senarios equally.
// 1. A message being sent by the Sender thread.
// 2. A message being sent as a reply to an MakeConnection.
worker.setMessage(returnRMMsg);
worker.run();
TransportUtils.setResponseWritten(pollMessage.getMessageContext(), true);
}
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Exit: MakeConnectionProcessor::replyToPoll");
}