in qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java [331:439]
protected boolean shutdown(Throwable cause) throws JMSException {
boolean listenerPresent = false;
if (closed.compareAndSet(false, true)) {
JMSException shutdownError = null;
try {
sessionInfo.setState(ResourceState.CLOSED);
setFailureCause(cause);
try {
stop();
for (JmsMessageConsumer consumer : new ArrayList<JmsMessageConsumer>(this.consumers.values())) {
if (consumer.hasMessageListener()) {
listenerPresent = true;
}
consumer.shutdown(cause);
}
for (JmsMessageProducer producer : new ArrayList<JmsMessageProducer>(this.producers.values())) {
producer.shutdown(cause);
}
} catch (JMSException jmsEx) {
shutdownError = jmsEx;
}
boolean inDoubt = transactionContext.isInDoubt();
try {
transactionContext.shutdown();
} catch (JMSException jmsEx) {
if (!inDoubt) {
LOG.warn("Rollback of active transaction failed due to error: ", jmsEx);
shutdownError = shutdownError == null ? jmsEx : shutdownError;
} else {
LOG.trace("Rollback of indoubt transaction failed due to error: ", jmsEx);
}
}
try {
if (getSessionMode() == Session.CLIENT_ACKNOWLEDGE) {
acknowledge(ACK_TYPE.SESSION_SHUTDOWN);
}
} catch (Exception e) {
LOG.trace("Exception during session shutdown cleanup acknowledgement", e);
}
// Ensure that no asynchronous completion sends remain blocked after close but wait
// using the close timeout for the asynchronous sends to complete normally.
final ExecutorService completionExecutor = getCompletionExecutor();
try {
synchronized (sessionInfo) {
// Producers are now quiesced and we can await completion of asynchronous sends
// that are still pending a result or timeout once we've done a quick check to
// see if any are actually pending or have completed already.
asyncSendsCompletion = connection.newProviderFuture();
if (asyncSendsCompletion != null) {
completionExecutor.execute(() -> {
if (asyncSendQueue.isEmpty()) {
asyncSendsCompletion.onSuccess();
}
});
}
}
try {
if (asyncSendsCompletion != null) {
asyncSendsCompletion.sync(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
}
} catch (Exception ex) {
LOG.trace("Exception during wait for asynchronous sends to complete", ex);
} finally {
if (cause == null) {
cause = new JMSException("Session closed remotely before message transfer result was notified");
}
// as a last task we want to fail any stragglers in the asynchronous send queue and then
// shutdown the queue to prevent any more submissions while the cleanup goes on.
completionExecutor.execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
}
} finally {
completionExecutor.shutdown();
try {
completionExecutor.awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.trace("Session close awaiting send completions was interrupted");
}
}
if (shutdownError != null) {
throw shutdownError;
}
} catch (Throwable e) {
if (shutdownError != null) {
throw shutdownError;
} else {
throw JmsExceptionSupport.create(e);
}
} finally {
connection.removeSession(sessionInfo);
}
}
return listenerPresent;
}