in qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java [1442:1542]
public void onResourceClosed(final JmsResource resource, final ProviderException cause) {
// Closure of the Connection itself is notified via onConnectionFailure
// Run on the connection executor to free the provider to go do more work and avoid
// any chance of a deadlock if the code ever looped back to the provider.
if (!closing.get() && !closed.get()) {
// Set the failure cause indicator now to more quickly reflect the correct
// state in the resource. The actual shutdown and clean will be done on the
// connection executor thread to avoid looping or stalling the provider thread.
if (resource instanceof JmsSessionInfo) {
JmsSession session = sessions.get(resource.getId());
if (session != null) {
session.setFailureCause(cause);
}
} else if (resource instanceof JmsProducerInfo) {
JmsSessionId parentId = ((JmsProducerInfo) resource).getParentId();
JmsSession session = sessions.get(parentId);
if (session != null) {
JmsMessageProducer producer = session.lookup((JmsProducerId) resource.getId());
if (producer != null) {
producer.setFailureCause(cause);
}
}
} else if (resource instanceof JmsConsumerInfo) {
JmsConsumerInfo consumerInfo = (JmsConsumerInfo) resource;
if (consumerInfo.isConnectionConsumer()) {
JmsConnectionConsumer consumer = connectionConsumers.get(consumerInfo.getId());
if (consumer != null) {
consumer.setFailureCause(cause);
}
} else {
JmsSessionId parentId = consumerInfo.getParentId();
JmsSession session = sessions.get(parentId);
if (session != null) {
JmsMessageConsumer consumer = session.lookup((JmsConsumerId) resource.getId());
if (consumer != null) {
consumer.setFailureCause(cause);
}
}
}
}
executor.execute(new Runnable() {
@Override
public void run() {
if (resource instanceof JmsSessionInfo) {
JmsSession session = sessions.get(resource.getId());
if (session != null) {
session.sessionClosed(cause);
for (JmsConnectionListener listener : connectionListeners) {
listener.onSessionClosed(session, cause);
}
}
} else if (resource instanceof JmsProducerInfo) {
JmsSessionId parentId = ((JmsProducerInfo) resource).getParentId();
JmsSession session = sessions.get(parentId);
if (session != null) {
JmsMessageProducer producer = session.producerClosed((JmsProducerInfo) resource, cause);
if (producer != null) {
for (JmsConnectionListener listener : connectionListeners) {
listener.onProducerClosed(producer, cause);
}
}
}
} else if (resource instanceof JmsConsumerInfo) {
JmsConsumerInfo consumerInfo = (JmsConsumerInfo) resource;
if (consumerInfo.isConnectionConsumer()) {
JmsConnectionConsumer consumer = connectionConsumers.get(consumerInfo.getId());
if (consumer != null) {
try {
if (consumer != null) {
consumer.shutdown(cause);
}
} catch (Throwable error) {
LOG.trace("Ignoring exception thrown during cleanup of closed connection consumer", error);
}
onAsyncException(new JMSException("Connection Consumer remotely closed").initCause(cause));
}
} else {
JmsSessionId parentId = consumerInfo.getParentId();
JmsSession session = sessions.get(parentId);
if (session != null) {
JmsMessageConsumer consumer = session.consumerClosed((JmsConsumerInfo) resource, cause);
if (consumer != null) {
for (JmsConnectionListener listener : connectionListeners) {
listener.onConsumerClosed(consumer, cause);
}
}
}
}
} else {
LOG.info("A JMS resource has been remotely closed: {}", resource);
}
}
});
}
}