in modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java [467:568]
private void unblockTransportThreads(StorageManager manager) throws SandeshaStorageException {
if (log.isDebugEnabled())
log.debug("Enter: Sender::unblockTransportThreads");
Transaction transaction = null;
try {
transaction = manager.getTransaction();
// This finder will look for beans that have been locking the
// transport for longer than
// the TRANSPORT_WAIT_TIME. The match method for SenderBeans does
// the time comparison
// for us.
SenderBean finder = new SenderBean();
finder.setSend(false);
finder.setTransportAvailable(true);
finder.setTimeToSend(System.currentTimeMillis() - Sandesha2Constants.TRANSPORT_WAIT_TIME);
List<SenderBean> beans = manager.getSenderBeanMgr().find(finder);
Iterator<SenderBean> beanIter = beans.iterator();
while (beanIter.hasNext()) {
// The beans we have found are assigned to an internal sequence
// id, but the create
// sequence has not completed yet (and perhaps never will).
// Server-side, most of the
// info that we can usefully print is associated with the
// inbound sequence that generated
// this message.
SenderBean bean = (SenderBean) beanIter.next();
// Load the message, so that we can free the transport (if there
// is one there). The
// case we are trying to free up is when there is a
// request-response transport, and
// it's still there waiting.
MessageContext msgCtx = manager.retrieveMessageContext(bean.getMessageContextRefKey(), context);
RequestResponseTransport t = null;
MessageContext inMsg = null;
OperationContext op = msgCtx.getOperationContext();
if (op != null)
inMsg = op.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
if (inMsg != null)
t = (RequestResponseTransport) inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
if ((t != null && RequestResponseTransportStatus.WAITING.equals(t.getStatus()))) {
if (log.isWarnEnabled()) {
String message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.freeingTransport);
log.warn(message);
}
// If the message is a reply, then the request may need to
// be acked. Rather
// than just return a HTTP 202, we should try to send an
// ack.
boolean sendAck = false;
RMDBean inbound = null;
String inboundSeq = bean.getInboundSequenceId();
if (inboundSeq != null)
inbound = SandeshaUtil.getRMDBeanFromSequenceId(manager, inboundSeq);
if (inbound != null) {
EndpointReference acksToEPR = inbound.getAcksToEndpointReference();
if (acksToEPR != null && acksToEPR.hasAnonymousAddress())
sendAck = true;
}
if (sendAck) {
RMMsgContext rmMsgCtx = MsgInitializer.initializeMessage(msgCtx);
RMMsgContext ackRMMsgCtx = AcknowledgementManager.generateAckMessage(rmMsgCtx, inbound, inbound.getSequenceID(), storageManager, true);
AcknowledgementManager.sendAckNow(ackRMMsgCtx);
TransportUtils.setResponseWritten(msgCtx, true);
} else {
TransportUtils.setResponseWritten(msgCtx, false);
}
// Mark the bean so that we know the transport is missing,
// and reset the send time
bean.setTransportAvailable(false);
bean.setTimeToSend(System.currentTimeMillis());
// Update the bean
manager.getSenderBeanMgr().update(bean);
}
}
if (transaction != null && transaction.isActive())
transaction.commit();
transaction = null;
} catch (Exception e) {
// There isn't much we can do here, so log the exception and
// continue.
if (log.isDebugEnabled())
log.debug("Exception", e);
} finally {
if (transaction != null && transaction.isActive())
transaction.rollback();
}
if (log.isDebugEnabled())
log.debug("Exit: Sender::unblockTransportThreads");
}