in example-transports/src/main/java/co/elastic/clients/transport/netty/NettyTransportHttpClient.java [109:211]
public CompletableFuture<Response> performRequestAsync(String endpointId, Node node, Request request, TransportOptions options) {
CompletableFuture<Response> promise = new CompletableFuture<Response>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// TODO: cancel pending request
return super.cancel(mayInterruptIfRunning);
}
};
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (node.uri().getScheme().equals("https")) {
pipeline.addLast(sslContext.newHandler(ch.alloc()));
}
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new ChannelHandler(node, promise));
}
});
String uri = request.path();
// If the node is not at the server root, prepend its path.
String nodePath = node.uri().getRawPath();
if (nodePath.length() > 1) {
if (uri.charAt(0) == '/') {
uri = uri.substring(1);
}
uri = nodePath + uri;
}
// Append query parameters
String queryString = queryString(request, options);
if (queryString != null) {
uri = uri + "?" + queryString;
}
ByteBuf nettyBody;
Iterable<ByteBuffer> body = request.body();
if (body == null) {
nettyBody = Unpooled.buffer(0);
} else {
List<ByteBuffer> bufs;
if (body instanceof List) {
bufs = (List<ByteBuffer>)body;
} else {
bufs = new ArrayList<>();
for (ByteBuffer buf: body) {
bufs.add(buf);
}
}
nettyBody = Unpooled.wrappedBuffer(bufs.toArray(new ByteBuffer[bufs.size()]));
}
FullHttpRequest nettyRequest = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1,
HttpMethod.valueOf(request.method()),
uri,
nettyBody
);
HttpHeaders nettyHeaders = nettyRequest.headers();
// Netty doesn't set Content-Length automatically with FullRequest.
nettyHeaders.set(HttpHeaderNames.CONTENT_LENGTH, nettyBody.readableBytes());
int port = node.uri().getPort();
if (port == -1) {
port = node.uri().getScheme().equals("https") ? 443 : 80;
}
nettyHeaders.set(HttpHeaderNames.HOST, node.uri().getHost() + ":" + port);
request.headers().forEach(nettyHeaders::set);
options.headers().stream().forEach((kv) -> nettyHeaders.set(kv.getKey(), kv.getValue()));
ChannelFuture future0 = bootstrap.connect(node.uri().getHost(), port);
future0.addListener((ChannelFutureListener) future1 -> {
if (checkSuccess(future1, promise)) {
ChannelFuture future2 = future1.channel().writeAndFlush(nettyRequest);
future2.addListener((ChannelFutureListener) future3 -> {
if (checkSuccess(future3, promise)) {
// Log request sent?
}
});
}
});
future0.addListener(future4 -> {
if (future4.cause() != null) {
promise.completeExceptionally(future4.cause());
} else if (future4.isCancelled()) {
promise.completeExceptionally(new RuntimeException("Request was cancelled"));
}
});
return promise;
}