in modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java [62:161]
public static void piggybackAcksIfPresent(RMMsgContext rmMessageContext, StorageManager storageManager) throws SandeshaException {
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Enter: AcknowledgementManager::piggybackAcksIfPresent");
// If this message is going to an anonymous address, and the inbound
// sequence has
// anonymous acksTo, then we add in an ack for the inbound sequence.
EndpointReference target = rmMessageContext.getTo();
if (target == null || target.hasAnonymousAddress()) {
// We have no good indicator of the identity of the destination, so
// the only sequence
// we can ack is the inbound one that caused us to create this
// response.
String inboundSequence = (String) rmMessageContext.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_SEQUENCE_ID);
if (inboundSequence != null) {
RMDBean inboundBean = SandeshaUtil.getRMDBeanFromSequenceId(storageManager, inboundSequence);
if (inboundBean != null && !inboundBean.isTerminated()) {
EndpointReference acksToEPR = inboundBean.getAcksToEndpointReference();
if (acksToEPR == null || acksToEPR.hasAnonymousAddress()) {
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Piggybacking ack for inbound sequence: " + inboundSequence);
RMMsgCreator.addAckMessage(rmMessageContext, inboundSequence, inboundBean, false, true);
}
}
}
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent, anon");
return;
} else {
// an addressable EPR
if (SandeshaUtil.hasReferenceParameters(target)) {
// we should not proceed since we cannot properly compare ref
// params
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent, target has refParams");
return;
}
String inboundSequence = (String) rmMessageContext.getProperty(Sandesha2Constants.MessageContextProperties.INBOUND_SEQUENCE_ID);
// If there's an inbound sequence (i.e. we're provider side) we'll
// use that, otherwise
// we'll go to the expense of looking the sequence up by the acksTo
// address.
if (inboundSequence != null) {
// We used to look for an ack sender bean before piggybacking an
// ack, but in the high-througput
// scenarios there always was one, and in the low thoughput
// scenarios it's less of an issue if
// we piggyback when we don't have to. so for now, lets mimic
// the old high-throughout behaviour
// in a cheap way by always piggybacking.
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Piggybacking ack for sequence: " + inboundSequence);
RMDBean sequence = storageManager.getRMDBeanMgr().retrieve(inboundSequence);
if (sequence != null && !sequence.isTerminated()) {
RMMsgCreator.addAckMessage(rmMessageContext, inboundSequence, sequence, false, true);
((Sender) storageManager.getSender()).removeScheduledAcknowledgement(inboundSequence);
}
} else {
RMDBean findRMDBean = new RMDBean();
findRMDBean.setAcksToEndpointReference(target);
findRMDBean.setTerminated(false);
Collection<RMDBean> rmdBeans = storageManager.getRMDBeanMgr().find(findRMDBean);
Iterator<RMDBean> sequences = rmdBeans.iterator();
while (sequences.hasNext()) {
RMDBean sequence = (RMDBean) sequences.next();
if (SandeshaUtil.hasReferenceParameters(sequence.getAcksToEndpointReference())) {
// we should not piggy back if there are reference
// parameters in the acksTo EPR since we cannot compare
// them
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent, target has refParams");
break;
}
String sequenceId = sequence.getSequenceID();
// We used to look for an ack sender bean before
// piggybacking an ack, but in the high-througput
// scenarios there always was one, and in the low thoughput
// scenarios it's less of an issue if
// we piggyback when we don't have to. so for now, lets
// mimic the old high-throughout behaviour
// in a cheap way by always piggybacking.
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Piggybacking ack for sequence: " + sequenceId);
RMMsgCreator.addAckMessage(rmMessageContext, sequenceId, sequence, false, true);
((Sender) storageManager.getSender()).removeScheduledAcknowledgement(sequenceId);
}
}
}
if (LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Exit: AcknowledgementManager::piggybackAcksIfPresent");
return;
}