public Cancellable execute()

in httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java [254:431]


    public Cancellable execute(
            final AsyncClientExchangeHandler exchangeHandler,
            final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
            final HttpContext context) {
        final ComplexCancellable cancellable = new ComplexCancellable();
        try {
            if (!isRunning()) {
                throw new CancellationException("Request execution cancelled");
            }
            final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
            exchangeHandler.produceRequest((request, entityDetails, context1) -> {
                RequestConfig requestConfig = null;
                if (request instanceof Configurable) {
                    requestConfig = ((Configurable) request).getConfig();
                }
                if (requestConfig != null) {
                    clientContext.setRequestConfig(requestConfig);
                } else {
                    requestConfig = clientContext.getRequestConfig();
                }
                final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
                @SuppressWarnings("deprecation")
                final Timeout connectTimeout = requestConfig.getConnectTimeout();
                final Timeout responseTimeout = requestConfig.getResponseTimeout();
                final HttpHost target = new HttpHost(request.getScheme(), request.getAuthority());

                final Future<AsyncConnectionEndpoint> leaseFuture = leaseEndpoint(
                        target,
                        connectionRequestTimeout,
                        connectTimeout,
                        clientContext,
                        new FutureCallback<AsyncConnectionEndpoint>() {

                            @Override
                            public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
                                final InternalAsyncClientEndpoint endpoint = new InternalAsyncClientEndpoint(connectionEndpoint);
                                final AtomicInteger messageCountDown = new AtomicInteger(2);
                                final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {

                                    @Override
                                    public void releaseResources() {
                                        try {
                                            exchangeHandler.releaseResources();
                                        } finally {
                                            endpoint.releaseAndDiscard();
                                        }
                                    }

                                    @Override
                                    public void failed(final Exception cause) {
                                        try {
                                            exchangeHandler.failed(cause);
                                        } finally {
                                            endpoint.releaseAndDiscard();
                                        }
                                    }

                                    @Override
                                    public void cancel() {
                                        failed(new RequestFailedException("Request aborted"));
                                    }

                                    @Override
                                    public void produceRequest(
                                            final RequestChannel channel,
                                            final HttpContext context1) throws HttpException, IOException {
                                        channel.sendRequest(request, entityDetails, context1);
                                        if (entityDetails == null) {
                                            messageCountDown.decrementAndGet();
                                        }
                                    }

                                    @Override
                                    public int available() {
                                        return exchangeHandler.available();
                                    }

                                    @Override
                                    public void produce(final DataStreamChannel channel) throws IOException {
                                        exchangeHandler.produce(new DataStreamChannel() {

                                            @Override
                                            public void requestOutput() {
                                                channel.requestOutput();
                                            }

                                            @Override
                                            public int write(final ByteBuffer src) throws IOException {
                                                return channel.write(src);
                                            }

                                            @Override
                                            public void endStream(final List<? extends Header> trailers) throws IOException {
                                                channel.endStream(trailers);
                                                if (messageCountDown.decrementAndGet() <= 0) {
                                                    endpoint.releaseAndReuse();
                                                }
                                            }

                                            @Override
                                            public void endStream() throws IOException {
                                                channel.endStream();
                                                if (messageCountDown.decrementAndGet() <= 0) {
                                                    endpoint.releaseAndReuse();
                                                }
                                            }

                                        });
                                    }

                                    @Override
                                    public void consumeInformation(
                                            final HttpResponse response,
                                            final HttpContext context1) throws HttpException, IOException {
                                        exchangeHandler.consumeInformation(response, context1);
                                    }

                                    @Override
                                    public void consumeResponse(
                                            final HttpResponse response,
                                            final EntityDetails entityDetails,
                                            final HttpContext context1) throws HttpException, IOException {
                                        exchangeHandler.consumeResponse(response, entityDetails, context1);
                                        if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
                                            messageCountDown.decrementAndGet();
                                        }
                                        if (entityDetails == null) {
                                            if (messageCountDown.decrementAndGet() <= 0) {
                                                endpoint.releaseAndReuse();
                                            }
                                        }
                                    }

                                    @Override
                                    public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
                                        exchangeHandler.updateCapacity(capacityChannel);
                                    }

                                    @Override
                                    public void consume(final ByteBuffer src) throws IOException {
                                        exchangeHandler.consume(src);
                                    }

                                    @Override
                                    public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
                                        if (messageCountDown.decrementAndGet() <= 0) {
                                            endpoint.releaseAndReuse();
                                        }
                                        exchangeHandler.streamEnd(trailers);
                                    }

                                };
                                if (responseTimeout != null) {
                                    endpoint.setSocketTimeout(responseTimeout);
                                }
                                endpoint.execute(internalExchangeHandler, pushHandlerFactory, clientContext);
                            }

                            @Override
                            public void failed(final Exception ex) {
                                exchangeHandler.failed(ex);
                            }

                            @Override
                            public void cancelled() {
                                exchangeHandler.cancel();
                            }

                        });

                cancellable.setDependency(() -> leaseFuture.cancel(true));
            }, context);

        } catch (final HttpException | IOException | IllegalStateException ex) {
            exchangeHandler.failed(ex);
        }
        return cancellable;
    }