override def channelReadComplete()

in flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/HttpHandler.scala [43:66]


  override def channelReadComplete(ctx: ChannelHandlerContext): Unit = ctx.flush

  override def channelRead(ctx: ChannelHandlerContext, msg: AnyRef): Unit = {
    msg match {
      case req: HttpRequest =>
        if (HttpUtil.is100ContinueExpected(req)) {
          ctx.write(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE))
        }

        val keepAlive: Boolean = HttpUtil.isKeepAlive(req)
        if (!keepAlive) {
          ctx.writeAndFlush(buildResponse()).addListener(ChannelFutureListener.CLOSE)
        } else {
          val decoder = new QueryStringDecoder(req.uri)
          val param: java.util.Map[String, java.util.List[String]] = decoder.parameters()
          if (param.containsKey(paramKey)) {
            sc.collect(param.get(paramKey).get(0))
          }
          ctx.writeAndFlush(buildResponse())
        }
      case x =>
        logger.info("unsupported request format " + x)
    }
  }