in modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java [570:682]
private void checkForOrphanMessages(StorageManager manager) throws SandeshaStorageException {
if (log.isDebugEnabled())
log.debug("Enter: Sender::checkForOrphanMessages");
Transaction tran = null;
try {
tran = manager.getTransaction();
// This finder will look for beans that should have been sent, but
// could not be sent
// because they need a MakeConnection message to come in to pick it
// up. We also factor
// in TRANSPORT_WAIT_TIME to give the MakeConnection a chance to
// arrive.
SenderBean finder = new SenderBean();
finder.setSend(true);
finder.setTransportAvailable(false);
finder.setTimeToSend(System.currentTimeMillis() - Sandesha2Constants.TRANSPORT_WAIT_TIME);
List<SenderBean> beans = manager.getSenderBeanMgr().find(finder);
//Commit this transaction
tran.commit();
// Create a new transaction
tran = manager.getTransaction();
Iterator<SenderBean> beanIter = beans.iterator();
while (beanIter.hasNext()) {
SenderBean bean = (SenderBean) beanIter.next();
// Emit a message to warn the user that MakeConnections are not
// arriving to pick
// messages up
if (log.isWarnEnabled()) {
String message = null;
String internalSequenceID = bean.getInternalSequenceID();
String sequenceID = bean.getSequenceID();
if(!warnedAlreadyOrphans.containsKey(sequenceID)){ // we only want to do log.warn once per orphaned sequenceId
if (bean.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION)
message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPolling, sequenceID, internalSequenceID);
else
{
String messageType = Integer.toString(bean.getMessageType());
message = SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPollingProtocol, messageType, sequenceID, internalSequenceID);
}
warnedAlreadyOrphans.put(sequenceID, System.currentTimeMillis());
log.warn(message);
}
}
// If client shuts down too quickly, termination messages get orphaned and this has an impact on performance.
// Will delete these once they have been recognised as orphans.
int messageType = bean.getMessageType();
if(MessageTypes.TERMINATE_SEQ == messageType || MessageTypes.TERMINATE_SEQ_RESPONSE == messageType){
String id = bean.getSequenceID(); // get this again as it is an error case
// Mark the sequence as terminated
RMSBean rmsBean = SandeshaUtil.getRMSBeanFromSequenceId(manager, id);
TerminateManager.terminateSendingSide(rmsBean, storageManager, false, null);
if(log.isDebugEnabled()) log.debug("Sender::checkForOrphanMessages. Orphaned message of type TERMINATE_SEQ or TERMINATE_SEQ_RESPONSE found. Deleting this message with a sequence ID of : " + id);
// Delete the terminate sender bean.
manager.getSenderBeanMgr().delete(bean.getMessageID());
} else {
// Update the bean so that we won't emit another message for another TRANSPORT_WAIT_TIME
bean.setTimeToSend(System.currentTimeMillis());
manager.getSenderBeanMgr().update(bean);
}
// clean up warnedAlreadyOrphans list when it gets big - currently over a thousand entries, or every10 minutes
// delete everything over an hour old
long currentTime = System.currentTimeMillis();
if(lastRanCleanup == 0){
lastRanCleanup = System.currentTimeMillis();
}
if( warnedAlreadyOrphans.size() > 1000 || currentTime > (lastRanCleanup + 600000)){
if(log.isDebugEnabled()) log.debug("Sender::checkForOrphanMessages. Cleaning up list of orphans");
long timeAnHourAgo = currentTime - 3600000;
Iterator<String> it = warnedAlreadyOrphans.keySet().iterator();
while(it.hasNext()){
Object key = it.next();
long ageOfThisOrphan = ((Long)warnedAlreadyOrphans.get(key)).longValue();
if (ageOfThisOrphan < timeAnHourAgo) {
warnedAlreadyOrphans.remove(key);
}
}
lastRanCleanup = System.currentTimeMillis();
}
}
if (tran != null && tran.isActive())
tran.commit();
tran = null;
} catch (Exception e) {
// There isn't much we can do here, so log the exception and
// continue.
if (log.isDebugEnabled())
log.debug("Exception", e);
} finally {
if (tran != null && tran.isActive())
tran.rollback();
}
if (log.isDebugEnabled())
log.debug("Exit: Sender::checkForOrphanMessages");
}