in pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/http/NettyHttpServerHandler.java [73:101]
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
HttpRequest request = this.request = (HttpRequest) msg;
if (HttpUtil.is100ContinueExpected(request)) {
send100Continue(ctx);
}
}
if (msg instanceof HttpContent) {
HttpContent httpContent = (HttpContent) msg;
ByteBuf content = httpContent.content();
if (content.isReadable()) {
nettySource.consume(new NettyHttpRecord(Optional.ofNullable(""),
content.toString(CharsetUtil.UTF_8).getBytes()));
}
if (msg instanceof LastHttpContent) {
LastHttpContent trailer = (LastHttpContent) msg;
if (!writeResponse(trailer, ctx)) {
// If keep-alive is off, close the connection once the content is fully written.
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
}
}