public static void replyToPoll()

in modules/core/src/main/java/org/apache/sandesha2/msgprocessors/MakeConnectionProcessor.java [261:366]


	public static void replyToPoll(RMMsgContext pollMessage,
								   SenderBean matchingMessage,
								   StorageManager storageManager,
								   boolean pending,
								   String makeConnectionNamespace,
								   Transaction transaction)
			throws AxisFault {
		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
			log.debug("Enter: MakeConnectionProcessor::replyToPoll");
		TransportOutDescription transportOut = pollMessage.getMessageContext().getTransportOut();
		if (transportOut == null) {
			String message = SandeshaMessageHelper.getMessage(
					SandeshaMessageKeys.cantSendMakeConnectionNoTransportOut);
			if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug(message);
			throw new SandeshaException(message);
		}

		String messageStorageKey = matchingMessage.getMessageContextRefKey();
		MessageContext returnMessage = storageManager.retrieveMessageContext(messageStorageKey, pollMessage.getConfigurationContext());
		if (returnMessage == null) {
			String message = "Cannot find the message stored with the key:" + messageStorageKey;
			if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled()) log.debug(message);
			// Someone else has either removed the sender & message, or another make connection got here first.
			return;
		}

		if (pending) addMessagePendingHeader(returnMessage, makeConnectionNamespace);
		boolean continueSending = true;
		RMMsgContext returnRMMsg = MsgInitializer.initializeMessage(returnMessage);
		if (returnRMMsg.getRMNamespaceValue() == null) {
			//this is the case when a stored application response msg was not sucecsfully returned
			//on the sending transport's backchannel. Since the msg was stored without a sequence header
			//we need to lookup the namespace using the RMS bean
			if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
				log.debug("Looking up rmNamespace from RMS bean");
			String sequenceID = matchingMessage.getSequenceID();
			if (sequenceID != null) {
				RMSBean rmsBean = new RMSBean();
				rmsBean.setSequenceID(sequenceID);
				rmsBean = storageManager.getRMSBeanMgr().findUnique(rmsBean);
				if (rmsBean != null) {
					returnRMMsg.setRMNamespaceValue(SpecSpecificConstants.getRMNamespaceValue(rmsBean.getRMVersion()));
				} else {
					//we will never be able to reply to this msg - at the moment the best bet is
					//to not process the reply anymore
					if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
						log.debug("Could not find RMS bean for polled msg");
					continueSending = false;
					//also remove the sender bean so that we do not select this again
					storageManager.getSenderBeanMgr().delete(matchingMessage.getMessageID());
				}
			}
		}

		if (continueSending) {

			// Commit the current transaction, so that the SenderWorker can do it's own locking
			// this transaction should be commited out before gettting the worker lock.
			// otherwise a dead lock can happen.
			if (transaction != null && transaction.isActive()) transaction.commit();

			SandeshaThread sender = storageManager.getSender();
			WorkerLock lock = sender.getWorkerLock();

			String workId = matchingMessage.getMessageID();
			SenderWorker worker = new SenderWorker(pollMessage.getConfigurationContext(), matchingMessage, pollMessage.getRMSpecVersion());
			worker.setLock(lock);
			worker.setWorkId(workId);
			while (!lock.addWork(workId, worker)) {
				try {
					// wait on the lock.
					lock.awaitRemoval(workId);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}

			}

			setTransportProperties(returnMessage, pollMessage);

			// Link the response to the request

			AxisOperation operation = SpecSpecificConstants.getWSRMOperation(Sandesha2Constants.MessageTypes.POLL_RESPONSE_MESSAGE, pollMessage.getRMSpecVersion(), pollMessage.getMessageContext().getAxisService());
			OperationContext context = new OperationContext(operation, pollMessage.getMessageContext().getServiceContext());

			context.addMessageContext(returnMessage);
			returnMessage.setServiceContext(null);
			returnMessage.setOperationContext(context);

			returnMessage.setProperty(Sandesha2Constants.MAKE_CONNECTION_RESPONSE, Boolean.TRUE);
			returnMessage.setProperty(RequestResponseTransport.TRANSPORT_CONTROL, pollMessage.getProperty(RequestResponseTransport.TRANSPORT_CONTROL));

			//running the MakeConnection through a SenderWorker.
			//This will allow Sandesha2 to consider both of following senarios equally.
			//  1. A message being sent by the Sender thread.
			//  2. A message being sent as a reply to an MakeConnection.

			worker.setMessage(returnRMMsg);
			worker.run();

			TransportUtils.setResponseWritten(pollMessage.getMessageContext(), true);
		}

		if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
			log.debug("Exit: MakeConnectionProcessor::replyToPoll");
	}