private static StreamingHttpClient buildStreaming()

in servicetalk-http-netty/src/main/java/io/servicetalk/http/netty/DefaultSingleAddressHttpClientBuilder.java [251:345]


    private static <U, R> StreamingHttpClient buildStreaming(final HttpClientBuildContext<U, R> ctx) {
        final ReadOnlyHttpClientConfig roConfig = ctx.httpConfig().asReadOnly();
        final HttpExecutionContext executionContext = ctx.builder.executionContextBuilder.build();
        if (roConfig.h2Config() != null && roConfig.hasProxy()) {
            throw new IllegalStateException("Proxying is not yet supported with HTTP/2");
        }

        // Track resources that potentially need to be closed when an exception is thrown during buildStreaming
        final CompositeCloseable closeOnException = newCompositeCloseable();
        try {
            final Publisher<? extends Collection<? extends ServiceDiscovererEvent<R>>> sdEvents =
                    ctx.serviceDiscoverer(executionContext).discover(ctx.address());

            ConnectionFactoryFilter<R, FilterableStreamingHttpConnection> connectionFactoryFilter =
                    ctx.builder.connectionFactoryFilter;
            ExecutionStrategy connectionFactoryStrategy =
                    ctx.builder.strategyComputation.buildForConnectionFactory();

            final SslContext sslContext = roConfig.tcpConfig().sslContext();
            if (roConfig.hasProxy() && sslContext != null) {
                assert roConfig.connectAddress() != null;
                ConnectionFactoryFilter<R, FilterableStreamingHttpConnection> proxy =
                        new ProxyConnectConnectionFactoryFilter<>(roConfig.connectAddress());
                connectionFactoryFilter = proxy.append(connectionFactoryFilter);
                connectionFactoryStrategy = connectionFactoryStrategy.merge(proxy.requiredOffloads());
            }

            final HttpExecutionStrategy executionStrategy = executionContext.executionStrategy();
            // closed by the LoadBalancer
            final ConnectionFactory<R, LoadBalancedStreamingHttpConnection> connectionFactory;
            final StreamingHttpRequestResponseFactory reqRespFactory = defaultReqRespFactory(roConfig,
                    executionContext.bufferAllocator());

            if (roConfig.isH2PriorKnowledge()) {
                H2ProtocolConfig h2Config = roConfig.h2Config();
                assert h2Config != null;
                connectionFactory = new H2LBHttpConnectionFactory<>(roConfig, executionContext,
                        ctx.builder.connectionFilterFactory, reqRespFactory,
                        connectionFactoryStrategy, connectionFactoryFilter,
                        ctx.builder.loadBalancerFactory::toLoadBalancedConnection);
            } else if (roConfig.tcpConfig().preferredAlpnProtocol() != null) {
                H1ProtocolConfig h1Config = roConfig.h1Config();
                H2ProtocolConfig h2Config = roConfig.h2Config();
                connectionFactory = new AlpnLBHttpConnectionFactory<>(roConfig, executionContext,
                        ctx.builder.connectionFilterFactory, new AlpnReqRespFactoryFunc(
                                executionContext.bufferAllocator(),
                                h1Config == null ? null : h1Config.headersFactory(),
                                h2Config == null ? null : h2Config.headersFactory()),
                        connectionFactoryStrategy, connectionFactoryFilter,
                        ctx.builder.loadBalancerFactory::toLoadBalancedConnection);
            } else {
                H1ProtocolConfig h1Config = roConfig.h1Config();
                assert h1Config != null;
                connectionFactory = new PipelinedLBHttpConnectionFactory<>(roConfig, executionContext,
                        ctx.builder.connectionFilterFactory, reqRespFactory,
                        connectionFactoryStrategy, connectionFactoryFilter,
                        ctx.builder.loadBalancerFactory::toLoadBalancedConnection);
            }

            final LoadBalancer<LoadBalancedStreamingHttpConnection> lb =
                    closeOnException.prepend(ctx.builder.loadBalancerFactory.newLoadBalancer(
                            targetAddress(ctx),
                            sdEvents,
                            connectionFactory));

            ContextAwareStreamingHttpClientFilterFactory currClientFilterFactory = ctx.builder.clientFilterFactory;
            if (roConfig.hasProxy() && sslContext == null) {
                // If we're talking to a proxy over http (not https), rewrite the request-target to absolute-form, as
                // specified by the RFC: https://tools.ietf.org/html/rfc7230#section-5.3.2
                currClientFilterFactory = appendFilter(currClientFilterFactory,
                        ctx.builder.proxyAbsoluteAddressFilterFactory());
            }
            if (ctx.builder.addHostHeaderFallbackFilter) {
                currClientFilterFactory = appendFilter(currClientFilterFactory, new HostHeaderHttpRequesterFilter(
                        ctx.builder.hostToCharSequenceFunction.apply(ctx.builder.address)));
            }

            FilterableStreamingHttpClient lbClient = closeOnException.prepend(
                    new LoadBalancedStreamingHttpClient(executionContext, lb, reqRespFactory));
            if (ctx.builder.retryingHttpRequesterFilter == null) {
                ctx.builder.retryingHttpRequesterFilter = DEFAULT_AUTO_RETRIES;
                currClientFilterFactory = appendFilter(currClientFilterFactory,
                        ctx.builder.retryingHttpRequesterFilter);
            }
            HttpExecutionStrategy computedStrategy = ctx.builder.strategyComputation.buildForClient(executionStrategy);
            LOGGER.debug("Client for {} created with base strategy {} → computed strategy {}",
                    targetAddress(ctx), executionStrategy, computedStrategy);
            return new FilterableClientToClient(currClientFilterFactory != null ?
                    currClientFilterFactory.create(lbClient, lb.eventStream(), ctx.sdStatus) :
                        lbClient, computedStrategy);
        } catch (final Throwable t) {
            closeOnException.closeAsync().subscribe();
            throw t;
        }
    }