in modules/core/src/main/java/org/apache/sandesha2/msgprocessors/AckRequestedProcessor.java [95:220]
public boolean processAckRequestedHeader(RMMsgContext rmMsgCtx, OMElement soapHeader, AckRequested ackRequested) throws AxisFault {
if (log.isDebugEnabled())
log.debug("Enter: AckRequestedProcessor::processAckRequestedHeader " + soapHeader);
//checks weather the ack request was a piggybacked one.
boolean piggybackedAckRequest = !(rmMsgCtx.getMessageType()==Sandesha2Constants.MessageTypes.ACK_REQUEST);
//it is a piggybacked ackrequest so we can ignore as we will piggyback acks at every opportunity anyway
if(piggybackedAckRequest){
if (log.isDebugEnabled())
log.debug("Exit: AckRequestedProcessor::processAckRequestedHeader, it is a piggybacked ackrequest for seq " +
"so we can ignore as we will piggyback an ack " + Boolean.FALSE);
//No need to suspend. Just proceed.
return false;
}
String sequenceId = ackRequested.getIdentifier().getIdentifier();
MessageContext msgContext = rmMsgCtx.getMessageContext();
ConfigurationContext configurationContext = msgContext.getConfigurationContext();
StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,
configurationContext.getAxisConfiguration());
// Check that the sender of this AckRequest holds the correct token
RMDBean rmdBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, sequenceId);
//check security credentials
if(rmdBean!=null){
SandeshaUtil.assertProofOfPossession(rmdBean, msgContext, soapHeader);
}
// Check that the sequence requested exists
if (FaultManager.checkForUnknownSequence(rmMsgCtx, sequenceId, storageManager, piggybackedAckRequest)) {
if (log.isDebugEnabled())
log.debug("Exit: AckRequestedProcessor::processAckRequestedHeader, Unknown sequence ");
return false;
}
// throwing a fault if the sequence is terminated
if (FaultManager.checkForSequenceTerminated(rmMsgCtx, sequenceId, rmdBean, piggybackedAckRequest)) {
if (log.isDebugEnabled())
log.debug("Exit: AckRequestedProcessor::processAckRequestedHeader, Sequence terminated");
return false;
}
// Setting the ack depending on AcksTo.
EndpointReference acksTo = rmdBean.getAcksToEndpointReference();
if (acksTo == null || acksTo.getAddress()==null)
throw new SandeshaException(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.acksToStrNotSet));
//Getting the operation for ack messages.
AxisOperation ackOperation = SpecSpecificConstants.getWSRMOperation(
Sandesha2Constants.MessageTypes.ACK,
rmdBean.getRMVersion(),
msgContext.getAxisService());
//creating the ack message. If the ackRequest was a standalone this will be a out (response) message
MessageContext ackMsgCtx = null;
ackMsgCtx = SandeshaUtil.createNewRelatedMessageContext(rmMsgCtx, ackOperation);
//setting up the RMMsgContext
RMMsgContext ackRMMsgCtx = MsgInitializer.initializeMessage(ackMsgCtx);
ackRMMsgCtx.setRMNamespaceValue(rmMsgCtx.getRMNamespaceValue());
if (ackMsgCtx.getMessageID()==null)
ackMsgCtx.setMessageID(SandeshaUtil.getUUID());
//adding the SOAP Envelope
SOAPFactory factory = (SOAPFactory)msgContext.getEnvelope().getOMFactory();
SOAPEnvelope envelope = factory.getDefaultEnvelope();
try {
ackMsgCtx.setEnvelope(envelope);
} catch (AxisFault e3) {
throw new SandeshaException(e3.getMessage());
}
ackMsgCtx.setTo(acksTo);
ackMsgCtx.setReplyTo(msgContext.getTo());
RMMsgCreator.addAckMessage(ackRMMsgCtx, sequenceId, rmdBean, true, false);
//this is not a client generated message. So set serverSide to true.
ackMsgCtx.setServerSide(true);
if (acksTo.hasAnonymousAddress()) {
//If acksTo is anonymous we will be sending the ack here it self. Transport will use what ever mechanism to send the
//message. (for e.g. HTTP will use the back channel)
// setting "response written" since acksto is anonymous
//adding an OperationContext if one is not available. (for e.g. If we are in the SandeshaGlobalInHandler)
if (rmMsgCtx.getMessageContext().getOperationContext() == null) {
// operation context will be null when doing in a GLOBAL
// handler.
ServiceContext serviceCtx = msgContext.getServiceContext();
OperationContext opCtx = OperationContextFactory.createOperationContext(ackOperation.getAxisSpecificMEPConstant(), ackOperation, serviceCtx);
rmMsgCtx.getMessageContext().setOperationContext(opCtx);
}
try {
AxisEngine.send(ackMsgCtx);
TransportUtils.setResponseWritten(rmMsgCtx.getMessageContext(), true);
} catch (AxisFault e1) {
throw new SandeshaException(e1.getMessage());
}
} else {
SandeshaPolicyBean propertyBean = SandeshaUtil.getPropertyBean(msgContext.getAxisOperation());
long ackInterval = propertyBean.getAcknowledgementInterval();
// Ack will be sent as stand alone, only after the acknowledgement interval
long timeToSend = System.currentTimeMillis() + ackInterval;
AcknowledgementManager.addAckBeanEntry(ackRMMsgCtx, sequenceId, timeToSend, storageManager);
}
if (log.isDebugEnabled())
log.debug("Exit: AckRequestedProcessor::processAckRequestedHeader " + Boolean.FALSE);
//No need to suspend. Just proceed.
return false;
}