in modules/core/src/main/java/org/apache/sandesha2/util/TerminateManager.java [259:392]
private static boolean cleanSendingSideData(String internalSequenceId, StorageManager storageManager,
RMSBean rmsBean, boolean reallocateIfPossible, Transaction transaction) throws SandeshaException {
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Enter: TerminateManager::cleanSendingSideData " + internalSequenceId + ", " + reallocateIfPossible);
boolean reallocatedOK = false;
SenderBeanMgr retransmitterBeanMgr = storageManager.getSenderBeanMgr();
// removing retransmitterMgr entries and corresponding message contexts.
Collection<SenderBean> collection = retransmitterBeanMgr.find(internalSequenceId);
Iterator<SenderBean> iterator = collection.iterator();
List<MessageContext> msgsToReallocate = null;
if(reallocateIfPossible){
msgsToReallocate = new LinkedList<MessageContext>();
}
Range[] ranges = rmsBean.getClientCompletedMessages().getRanges();
long lastAckedMsg = -1;
if(ranges.length==1){
//the sequence is a single contiguous acked range
lastAckedMsg = ranges[0].upperValue;
} else{
if(reallocateIfPossible){
//cannot reallocate as there are gaps
rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED);
storageManager.getRMSBeanMgr().update(rmsBean);
reallocateIfPossible=false;
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("cannot reallocate sequence as there are gaps");
}
}
while (iterator.hasNext()) {
SenderBean retransmitterBean = (SenderBean) iterator.next();
if(retransmitterBean.getMessageType()!=Sandesha2Constants.MessageTypes.TERMINATE_SEQ || rmsBean.isTerminated()){
//remove all but terminate sequence messages
String messageStoreKey = retransmitterBean.getMessageContextRefKey();
//if we have been asked to reallocate we need to send all unacked messages to a new sequence.
//We must ensure that we rerieve these messages in the correct order
if(reallocateIfPossible
&& retransmitterBean.getMessageType()==Sandesha2Constants.MessageTypes.APPLICATION
&& retransmitterBean.getMessageNumber()==lastAckedMsg+1){
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("adding message for reallocate: " + retransmitterBean.getMessageNumber());
//try to reallocate application msgs that are next in the outgoing list to
msgsToReallocate.add(storageManager.retrieveMessageContext(messageStoreKey, storageManager.getContext()));
retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
storageManager.removeMessageContext(messageStoreKey);
lastAckedMsg++;
}
else if(reallocateIfPossible){
//we are reallocating but this message does not fit the criteria. We should not delete it
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("cannot reallocate: " + retransmitterBean.getMessageNumber());
if(msgsToReallocate.size()==0){
try{
//however we might need this message if there are no messages to reallocate but we still
//need a new sequence - we use a dummy message
MessageContext dummy = SandeshaUtil.cloneMessageContext(
storageManager.retrieveMessageContext(messageStoreKey, storageManager.getContext()));
dummy.getOptions().setProperty(SandeshaClientConstants.DUMMY_MESSAGE, Sandesha2Constants.VALUE_TRUE);
msgsToReallocate.add(dummy);
}
catch(Exception e){
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Exit: TerminateManager::cleanSendingSideData " + e);
throw new SandeshaStorageException(e);
}
}
}
else{
//we are not reallocating so just delete the messages
retransmitterBeanMgr.delete(retransmitterBean.getMessageID());
storageManager.removeMessageContext(messageStoreKey);
}
}
}
if(reallocateIfPossible){
try{
SandeshaUtil.reallocateMessagesToNewSequence(storageManager, rmsBean, msgsToReallocate, transaction);
reallocatedOK = true;
//If the reallocation was successful and the RMSBean being reallocated was originally created for reallocation
//the RMSBean can be deleted.
transaction = storageManager.getTransaction();
if(rmsBean.isReallocated() == Sandesha2Constants.WSRM_COMMON.ORIGINAL_REALLOCATED_BEAN_COMPLETE){
rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.NOT_REALLOCATED);
storageManager.getRMSBeanMgr().update(rmsBean);
}
if(transaction != null && transaction.isActive()) transaction.commit();
transaction = null;
} catch(Exception e){
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.warn(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.reallocationFailed, rmsBean.getSequenceID()));
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Reallocation failed for messages on sequence " + rmsBean.getSequenceID() + " because of " + e);
//Reallocation Failed
//Need to mark any RMSBeans involved as failed so that we don't attempt to send
//anymore messages on these seq's. The client will have to manually reallocate and
//administer the sequences.
transaction = storageManager.getTransaction();
rmsBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED);
storageManager.getRMSBeanMgr().update(rmsBean);
String intSeqIDOfOriginallyReallocatedSeq = rmsBean.getInternalSeqIDOfSeqUsedForReallocation();
if(intSeqIDOfOriginallyReallocatedSeq != null){
RMSBean origRMSBean = SandeshaUtil.getRMSBeanFromInternalSequenceId(storageManager, intSeqIDOfOriginallyReallocatedSeq);
origRMSBean.setReallocated(Sandesha2Constants.WSRM_COMMON.REALLOCATION_FAILED);
storageManager.getRMSBeanMgr().update(origRMSBean);
}
if(transaction != null && transaction.isActive()) transaction.commit();
transaction = null;
} finally {
if (transaction != null && transaction.isActive()) {
transaction.rollback();
}
}
}
if(LoggingControl.isAnyTracingEnabled() && log.isDebugEnabled())
log.debug("Exit: TerminateManager::cleanSendingSideData " + reallocatedOK);
return reallocatedOK;
}