in qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java [331:409]
public void close() {
if (closed.compareAndSet(false, true)) {
final ProviderFuture request = futureFactory.createUnfailableFuture();
// Possible that the connect call failed before calling transport connect or the connect
// call failed and shutdown the event loop in which case we have no work to do other than
// to clean up the transport by closing it down.
if (serializer != null && !serializer.isShutdown()) {
try {
serializer.execute(() -> {
try {
// If we are not connected then there is nothing we can do now
// just signal success.
if (transport == null || !transport.isConnected()) {
request.onSuccess();
return;
}
if (connection != null) {
connection.close(request);
} else {
// If the SASL authentication occurred but failed then we don't
// need to do an open / close
if (authenticator != null && (!authenticator.isComplete() || !authenticator.wasSuccessful())) {
request.onSuccess();
return;
}
// Connection attempt might have been tried and failed so only perform
// an open / close cycle if one hasn't been done already.
if (protonConnection.getLocalState() == EndpointState.UNINITIALIZED) {
AmqpClosedConnectionBuilder builder = new AmqpClosedConnectionBuilder(getProvider(), connectionInfo);
builder.buildResource(request);
protonConnection.setContext(builder);
} else {
request.onSuccess();
}
}
pumpToProtonTransport(request);
} catch (Exception e) {
LOG.debug("Caught exception while closing proton connection: {}", e.getMessage());
} finally {
if (nextIdleTimeoutCheck != null) {
LOG.trace("Cancelling scheduled IdleTimeoutCheck");
nextIdleTimeoutCheck.cancel(false);
nextIdleTimeoutCheck = null;
}
}
});
} catch (RejectedExecutionException rje) {
// Transport likely encountered some critical error on connect and the executor
// resource is not initialized now, in which case just ignore and continue on.
LOG.trace("Close of provider resources was rejected from Transport IO thread: ", rje);
request.onSuccess();
}
} else {
request.onSuccess();
}
try {
if (getCloseTimeout() < 0) {
request.sync();
} else {
request.sync(getCloseTimeout(), TimeUnit.MILLISECONDS);
}
} catch (ProviderException e) {
LOG.warn("Error caught while closing Provider: {}", e.getMessage() != null ? e.getMessage() : "<Unknown Error>");
} finally {
if (transport != null) {
try {
transport.close();
} catch (Exception e) {
LOG.debug("Caught exception while closing down Transport: {}", e.getMessage());
}
}
}
}
}