in qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java [553:651]
public void destroy(final JmsResource resource, final AsyncResult request) throws ProviderException {
checkClosedOrFailed();
checkConnected();
serializer.execute(() -> {
try {
checkClosedOrFailed();
resource.visit(new JmsDefaultResourceVisitor() {
@Override
public void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception {
final AmqpSession session = connection.getSession(sessionInfo.getId());
session.close(new AsyncResult() {
// TODO: bit of a hack, but works. Similarly below for locally initiated consumer close.
@Override
public void onSuccess() {
onComplete();
request.onSuccess();
}
@Override
public void onFailure(ProviderException result) {
onComplete();
request.onFailure(result);
}
@Override
public boolean isComplete() {
return request.isComplete();
}
void onComplete() {
// Mark the sessions resources closed, which in turn calls
// the subscription cleanup.
session.handleResourceClosure(AmqpProvider.this, null);
}
});
}
@Override
public void processProducerInfo(JmsProducerInfo producerInfo) throws Exception {
AmqpSession session = connection.getSession(producerInfo.getParentId());
AmqpProducer producer = session.getProducer(producerInfo);
producer.close(request);
}
@Override
public void processConsumerInfo(final JmsConsumerInfo consumerInfo) throws Exception {
AmqpSession session = connection.getSession(consumerInfo.getParentId());
final AmqpConsumer consumer = session.getConsumer(consumerInfo);
consumer.close(new AsyncResult() {
// TODO: bit of a hack, but works. Similarly above for locally initiated session close.
@Override
public void onSuccess() {
onComplete();
request.onSuccess();
}
@Override
public void onFailure(ProviderException result) {
onComplete();
request.onFailure(result);
}
@Override
public boolean isComplete() {
return request.isComplete();
}
void onComplete() {
connection.getSubTracker().consumerRemoved(consumerInfo);
}
});
}
@Override
public void processConnectionInfo(JmsConnectionInfo connectionInfo) throws Exception {
connection.close(request);
}
@Override
public void processDestination(JmsTemporaryDestination destination) throws Exception {
AmqpTemporaryDestination temporary = connection.getTemporaryDestination(destination);
if (temporary != null) {
temporary.close(request);
} else {
LOG.debug("Could not find temporary destination {} to delete.", destination);
request.onSuccess();
}
}
});
pumpToProtonTransport(request);
} catch (Throwable t) {
request.onFailure(ProviderExceptionSupport.createNonFatalOrPassthrough(t));
}
});
}