in flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/HttpHandler.scala [43:66]
override def channelReadComplete(ctx: ChannelHandlerContext): Unit = ctx.flush
override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = {
msg match {
case req: HttpRequest =>
if (HttpUtil.is100ContinueExpected(req)) {
ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE))
}
val keepAlive: Boolean = HttpUtil.isKeepAlive(req)
if (!keepAlive) {
ctx.writeAndFlush(buildResponse()).addListener(ChannelFutureListener.CLOSE)
} else {
val decoder = new QueryStringDecoder(req.uri)
val param: java.util.Map[String, java.util.List[String]] = decoder.parameters()
if (param.containsKey(paramKey)) {
sc.collect(param.get(paramKey).get(0))
}
ctx.writeAndFlush(buildResponse())
}
case x =>
logger.info("unsupported request format " + x)
}
}