private void unblockTransportThreads()

in modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java [467:568]


	private void unblockTransportThreads(StorageManager manager) throws SandeshaStorageException {
		if (log.isDebugEnabled())
			log.debug("Enter: Sender::unblockTransportThreads");

		Transaction transaction = null;
		try {
			transaction = manager.getTransaction();

			// This finder will look for beans that have been locking the
			// transport for longer than
			// the TRANSPORT_WAIT_TIME. The match method for SenderBeans does
			// the time comparison
			// for us.
			SenderBean finder = new SenderBean();
			finder.setSend(false);
			finder.setTransportAvailable(true);
			finder.setTimeToSend(System.currentTimeMillis() - Sandesha2Constants.TRANSPORT_WAIT_TIME);

			List<SenderBean> beans = manager.getSenderBeanMgr().find(finder);
			Iterator<SenderBean> beanIter = beans.iterator();
			while (beanIter.hasNext()) {
				// The beans we have found are assigned to an internal sequence
				// id, but the create
				// sequence has not completed yet (and perhaps never will).
				// Server-side, most of the
				// info that we can usefully print is associated with the
				// inbound sequence that generated
				// this message.
				SenderBean bean = (SenderBean) beanIter.next();

				// Load the message, so that we can free the transport (if there
				// is one there). The
				// case we are trying to free up is when there is a
				// request-response transport, and
				// it's still there waiting.
				MessageContext msgCtx = manager.retrieveMessageContext(bean.getMessageContextRefKey(), context);

				RequestResponseTransport t = null;
				MessageContext inMsg = null;
				OperationContext op = msgCtx.getOperationContext();
				if (op != null)
					inMsg = op.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
				if (inMsg != null)
					t = (RequestResponseTransport) inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);

				if ((t != null && RequestResponseTransportStatus.WAITING.equals(t.getStatus()))) {
					if (log.isWarnEnabled()) {
						String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.freeingTransport);
						log.warn(message);
					}
					// If the message is a reply, then the request may need to
					// be acked. Rather
					// than just return a HTTP 202, we should try to send an
					// ack.
					boolean sendAck = false;
					RMDBean inbound = null;
					String inboundSeq = bean.getInboundSequenceId();
					if (inboundSeq != null)
						inbound = SandeshaUtil.getRMDBeanFromSequenceId(manager, inboundSeq);

					if (inbound != null) {
						EndpointReference acksToEPR = inbound.getAcksToEndpointReference();
						if (acksToEPR != null && acksToEPR.hasAnonymousAddress())
							sendAck = true;
					}

					if (sendAck) {
						RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
						RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(rmMsgCtx, inbound, inbound.getSequenceID(), storageManager, true);
						AcknowledgementManager.sendAckNow(ackRMMsgCtx);
						TransportUtils.setResponseWritten(msgCtx, true);
					} else {
						TransportUtils.setResponseWritten(msgCtx, false);
					}

					// Mark the bean so that we know the transport is missing,
					// and reset the send time
					bean.setTransportAvailable(false);
					bean.setTimeToSend(System.currentTimeMillis());

					// Update the bean
					manager.getSenderBeanMgr().update(bean);
				}
			}

			if (transaction != null && transaction.isActive())
				transaction.commit();
			transaction = null;

		} catch (Exception e) {
			// There isn't much we can do here, so log the exception and
			// continue.
			if (log.isDebugEnabled())
				log.debug("Exception", e);
		} finally {
			if (transaction != null && transaction.isActive())
				transaction.rollback();
		}

		if (log.isDebugEnabled())
			log.debug("Exit: Sender::unblockTransportThreads");
	}