public CompletableFuture performRequestAsync()

in example-transports/src/main/java/co/elastic/clients/transport/netty/NettyTransportHttpClient.java [109:211]


    public CompletableFuture<Response> performRequestAsync(String endpointId, Node node, Request request, TransportOptions options) {

        CompletableFuture<Response> promise = new CompletableFuture<Response>() {
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                // TODO: cancel pending request
                return super.cancel(mayInterruptIfRunning);
            }
        };

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(workerGroup)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    if (node.uri().getScheme().equals("https")) {
                        pipeline.addLast(sslContext.newHandler(ch.alloc()));
                    }
                    pipeline.addLast(new HttpClientCodec());
                    pipeline.addLast(new ChannelHandler(node, promise));
                }
            });

        String uri = request.path();

        // If the node is not at the server root, prepend its path.
        String nodePath = node.uri().getRawPath();
        if (nodePath.length() > 1) {
            if (uri.charAt(0) == '/') {
                uri = uri.substring(1);
            }
            uri = nodePath + uri;
        }

        // Append query parameters
        String queryString = queryString(request, options);
        if (queryString != null) {
            uri = uri + "?" + queryString;
        }

        ByteBuf nettyBody;
        Iterable<ByteBuffer> body = request.body();
        if (body == null) {
            nettyBody = Unpooled.buffer(0);
        } else {
            List<ByteBuffer> bufs;
            if (body instanceof List) {
                bufs = (List<ByteBuffer>)body;
            } else {
                bufs = new ArrayList<>();
                for (ByteBuffer buf: body) {
                    bufs.add(buf);
                }
            }
            nettyBody = Unpooled.wrappedBuffer(bufs.toArray(new ByteBuffer[bufs.size()]));
        }

        FullHttpRequest nettyRequest = new DefaultFullHttpRequest(
            HttpVersion.HTTP_1_1,
            HttpMethod.valueOf(request.method()),
            uri,
            nettyBody
        );

        HttpHeaders nettyHeaders = nettyRequest.headers();
        // Netty doesn't set Content-Length automatically with FullRequest.
        nettyHeaders.set(HttpHeaderNames.CONTENT_LENGTH, nettyBody.readableBytes());

        int port = node.uri().getPort();
        if (port == -1) {
            port = node.uri().getScheme().equals("https") ? 443 : 80;
        }

        nettyHeaders.set(HttpHeaderNames.HOST, node.uri().getHost() + ":" + port);

        request.headers().forEach(nettyHeaders::set);
        options.headers().stream().forEach((kv) -> nettyHeaders.set(kv.getKey(), kv.getValue()));

        ChannelFuture future0 = bootstrap.connect(node.uri().getHost(), port);
        future0.addListener((ChannelFutureListener) future1 -> {
            if (checkSuccess(future1, promise)) {
                ChannelFuture future2 = future1.channel().writeAndFlush(nettyRequest);
                future2.addListener((ChannelFutureListener) future3 -> {
                    if (checkSuccess(future3, promise)) {
                        // Log request sent?
                    }
                });
            }
        });

        future0.addListener(future4 -> {
            if (future4.cause() != null) {
                promise.completeExceptionally(future4.cause());
            } else if (future4.isCancelled()) {
                promise.completeExceptionally(new RuntimeException("Request was cancelled"));
            }
        });

        return promise;
    }