private void createTunnel()

in httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncConnectExec.java [379:515]


    private void createTunnel(
            final State state,
            final HttpHost proxy,
            final HttpHost nextHop,
            final AsyncExecChain.Scope scope,
            final AsyncExecCallback asyncExecCallback) {

        final CancellableDependency operation = scope.cancellableDependency;
        final HttpClientContext clientContext = scope.clientContext;
        final AsyncExecRuntime execRuntime = scope.execRuntime;
        final String exchangeId = scope.exchangeId;

        final AuthExchange proxyAuthExchange = proxy != null ? clientContext.getAuthExchange(proxy) : new AuthExchange();

        if (authCacheKeeper != null) {
            authCacheKeeper.loadPreemptively(proxy, null, proxyAuthExchange, clientContext);
        }

        final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {

            private final AtomicReference<AsyncDataConsumer> entityConsumerRef = new AtomicReference<>();

            @Override
            public void releaseResources() {
                final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
                if (entityConsumer != null) {
                    entityConsumer.releaseResources();
                }
            }

            @Override
            public void failed(final Exception cause) {
                final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
                if (entityConsumer != null) {
                    entityConsumer.releaseResources();
                }
                asyncExecCallback.failed(cause);
            }

            @Override
            public void cancel() {
                failed(new InterruptedIOException());
            }

            @Override
            public void produceRequest(final RequestChannel requestChannel,
                                       final HttpContext httpContext) throws HttpException, IOException {
                final HttpRequest connect = new BasicHttpRequest(Method.CONNECT, nextHop, nextHop.toHostString());
                connect.setVersion(HttpVersion.HTTP_1_1);

                proxyHttpProcessor.process(connect, null, clientContext);
                authenticator.addAuthResponse(proxy, ChallengeType.PROXY, connect, proxyAuthExchange, clientContext);

                requestChannel.sendRequest(connect, null, clientContext);
            }

            @Override
            public void produce(final DataStreamChannel dataStreamChannel) throws IOException {
            }

            @Override
            public int available() {
                return 0;
            }

            @Override
            public void consumeInformation(final HttpResponse httpResponse,
                                           final HttpContext httpContext) throws HttpException, IOException {
            }

            @Override
            public void consumeResponse(final HttpResponse response,
                                        final EntityDetails entityDetails,
                                        final HttpContext httpContext) throws HttpException, IOException {
                clientContext.setResponse(response);
                proxyHttpProcessor.process(response, entityDetails, clientContext);

                final int status = response.getCode();
                if (status < HttpStatus.SC_SUCCESS) {
                    throw new HttpException("Unexpected response to CONNECT request: " + new StatusLine(response));
                }

                if (needAuthentication(proxyAuthExchange, proxy, response, clientContext)) {
                    state.challenged = true;
                } else {
                    state.challenged = false;
                    if (status >= HttpStatus.SC_REDIRECTION) {
                        state.tunnelRefused = true;
                        entityConsumerRef.set(asyncExecCallback.handleResponse(response, entityDetails));
                    } else if (status == HttpStatus.SC_OK) {
                        clientContext.setProtocolVersion(null);
                        asyncExecCallback.completed();
                    } else {
                        throw new HttpException("Unexpected response to CONNECT request: " + new StatusLine(response));
                    }
                }
            }

            @Override
            public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
                final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
                if (entityConsumer != null) {
                    entityConsumer.updateCapacity(capacityChannel);
                } else {
                    capacityChannel.update(Integer.MAX_VALUE);
                }
            }

            @Override
            public void consume(final ByteBuffer src) throws IOException {
                final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
                if (entityConsumer != null) {
                    entityConsumer.consume(src);
                }
            }

            @Override
            public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
                final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
                if (entityConsumer != null) {
                    entityConsumer.streamEnd(trailers);
                }
                asyncExecCallback.completed();
            }

        };

        if (LOG.isDebugEnabled()) {
            operation.setDependency(execRuntime.execute(
                    exchangeId,
                    new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
                    clientContext));
        } else {
            operation.setDependency(execRuntime.execute(exchangeId, internalExchangeHandler, clientContext));
        }

    }