in httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/HttpAsyncRequester.java [277:383]
public void execute(
final AsyncClientExchangeHandler exchangeHandler,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final Timeout timeout,
final HttpContext executeContext) {
Args.notNull(exchangeHandler, "Exchange handler");
Args.notNull(timeout, "Timeout");
Args.notNull(executeContext, "Context");
try {
exchangeHandler.produceRequest((request, entityDetails, requestContext) -> {
final String scheme = request.getScheme();
final URIAuthority authority = request.getAuthority();
if (authority == null) {
throw new ProtocolException("Request authority not specified");
}
final HttpHost target = new HttpHost(scheme, authority);
connect(target, timeout, null, new FutureCallback<AsyncClientEndpoint>() {
@Override
public void completed(final AsyncClientEndpoint endpoint) {
endpoint.execute(new AsyncClientExchangeHandler() {
@Override
public void releaseResources() {
endpoint.releaseAndDiscard();
exchangeHandler.releaseResources();
}
@Override
public void failed(final Exception cause) {
endpoint.releaseAndDiscard();
exchangeHandler.failed(cause);
}
@Override
public void cancel() {
endpoint.releaseAndDiscard();
exchangeHandler.cancel();
}
@Override
public void produceRequest(final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException {
channel.sendRequest(request, entityDetails, httpContext);
}
@Override
public int available() {
return exchangeHandler.available();
}
@Override
public void produce(final DataStreamChannel channel) throws IOException {
exchangeHandler.produce(channel);
}
@Override
public void consumeInformation(final HttpResponse response, final HttpContext httpContext) throws HttpException, IOException {
exchangeHandler.consumeInformation(response, httpContext);
}
@Override
public void consumeResponse(
final HttpResponse response, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException {
if (entityDetails == null) {
endpoint.releaseAndReuse();
}
exchangeHandler.consumeResponse(response, entityDetails, httpContext);
}
@Override
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
exchangeHandler.updateCapacity(capacityChannel);
}
@Override
public void consume(final ByteBuffer src) throws IOException {
exchangeHandler.consume(src);
}
@Override
public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
endpoint.releaseAndReuse();
exchangeHandler.streamEnd(trailers);
}
}, pushHandlerFactory, executeContext);
}
@Override
public void failed(final Exception ex) {
exchangeHandler.failed(ex);
}
@Override
public void cancelled() {
exchangeHandler.cancel();
}
});
}, executeContext);
} catch (final IOException | HttpException ex) {
exchangeHandler.failed(ex);
}
}