in httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java [257:376]
public Future<AsyncConnectionEndpoint> lease(
final String id,
final HttpRoute route,
final Object state,
final Timeout requestTimeout,
final FutureCallback<AsyncConnectionEndpoint> callback) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} endpoint lease request ({}) {}", id, requestTimeout, ConnPoolSupport.formatStats(route, state, pool));
}
return new Future<AsyncConnectionEndpoint>() {
final ConnectionConfig connectionConfig = resolveConnectionConfig(route);
final BasicFuture<AsyncConnectionEndpoint> resultFuture = new BasicFuture<>(callback);
final Future<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> leaseFuture = pool.lease(
route,
state,
requestTimeout, new FutureCallback<PoolEntry<HttpRoute, ManagedAsyncClientConnection>>() {
@Override
public void completed(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
if (poolEntry.hasConnection()) {
final TimeValue timeToLive = connectionConfig.getTimeToLive();
if (TimeValue.isNonNegative(timeToLive)) {
if (timeToLive.getDuration() == 0
|| Deadline.calculate(poolEntry.getCreated(), timeToLive).isExpired()) {
poolEntry.discardConnection(CloseMode.GRACEFUL);
}
}
}
if (poolEntry.hasConnection()) {
final ManagedAsyncClientConnection connection = poolEntry.getConnection();
final TimeValue timeValue = connectionConfig.getValidateAfterInactivity();
if (connection.isOpen() && TimeValue.isNonNegative(timeValue)) {
if (timeValue.getDuration() == 0
|| Deadline.calculate(poolEntry.getUpdated(), timeValue).isExpired()) {
final ProtocolVersion protocolVersion = connection.getProtocolVersion();
if (protocolVersion != null && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
connection.submitCommand(new PingCommand(new BasicPingHandler(result -> {
if (result == null || !result) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} connection {} is stale", id, ConnPoolSupport.getId(connection));
}
poolEntry.discardConnection(CloseMode.GRACEFUL);
}
leaseCompleted(poolEntry);
})), Command.Priority.IMMEDIATE);
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} connection {} is closed", id, ConnPoolSupport.getId(connection));
}
poolEntry.discardConnection(CloseMode.IMMEDIATE);
}
}
}
leaseCompleted(poolEntry);
}
void leaseCompleted(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
final ManagedAsyncClientConnection connection = poolEntry.getConnection();
if (connection != null) {
connection.activate();
}
if (LOG.isDebugEnabled()) {
LOG.debug("{} endpoint leased {}", id, ConnPoolSupport.formatStats(route, state, pool));
}
final AsyncConnectionEndpoint endpoint = new InternalConnectionEndpoint(poolEntry);
if (LOG.isDebugEnabled()) {
LOG.debug("{} acquired {}", id, ConnPoolSupport.getId(endpoint));
}
resultFuture.completed(endpoint);
}
@Override
public void failed(final Exception ex) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} endpoint lease failed", id);
}
resultFuture.failed(ex);
}
@Override
public void cancelled() {
if (LOG.isDebugEnabled()) {
LOG.debug("{} endpoint lease cancelled", id);
}
resultFuture.cancel();
}
});
@Override
public AsyncConnectionEndpoint get() throws InterruptedException, ExecutionException {
return resultFuture.get();
}
@Override
public AsyncConnectionEndpoint get(
final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return resultFuture.get(timeout, unit);
}
@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
return leaseFuture.cancel(mayInterruptIfRunning);
}
@Override
public boolean isDone() {
return resultFuture.isDone();
}
@Override
public boolean isCancelled() {
return resultFuture.isCancelled();
}
};
}