in qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java [1567:1622]
public void run() {
try {
SendCompletion completion = asyncSendQueue.peek();
if (completion.getEnvelope().getDispatchId() == envelope.getDispatchId()) {
try {
completion = asyncSendQueue.remove();
if (cause == null) {
completion.markAsComplete();
} else {
completion.markAsFailed(JmsExceptionSupport.create(cause));
}
completion.signalCompletion();
} catch (Throwable error) {
LOG.error("Failure while performing completion for send: {}", envelope, error);
}
// Signal any trailing completions that have been marked complete
// before this one was that they have now that the one in front has
Iterator<SendCompletion> pending = asyncSendQueue.iterator();
while (pending.hasNext()) {
completion = pending.next();
if (completion.hasCompleted()) {
try {
completion.signalCompletion();
} catch (Throwable error) {
LOG.error("Failure while performing completion for send: {}", envelope, error);
} finally {
pending.remove();
}
} else {
break;
}
}
} else {
// Not head so mark as complete and wait for the one in front to send
// the notification of completion.
Iterator<SendCompletion> pending = asyncSendQueue.iterator();
while (pending.hasNext()) {
completion = pending.next();
if (completion.getEnvelope().getDispatchId() == envelope.getDispatchId()) {
if (cause == null) {
completion.markAsComplete();
} else {
completion.markAsFailed(JmsExceptionSupport.create(cause));
}
}
}
}
if (closed.get() && asyncSendsCompletion != null && asyncSendQueue.isEmpty()) {
asyncSendsCompletion.onSuccess();
}
} catch (Exception ex) {
LOG.error("Async completion task encountered unexpected failure", ex);
}
}