def startNettyServer()

in flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/TcpServer.scala [62:102]


  def startNettyServer(
    portNotInUse: Int,
    callbackUrl: Option[String]
  ): InetSocketAddress = synchronized {
    if (!isRunning.get()) {

      val server = new ServerBootstrap
      val bootstrap: ServerBootstrap = server
        .group(bossGroup, workerGroup)
        .channel(classOf[NioServerSocketChannel])
        .option[java.lang.Boolean](ChannelOption.TCP_NODELAY, true)
        .option[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true)

      tcpOpts(bootstrap)

      val bootWithHandler = bootstrap
        .handler(new LoggingHandler(logLevel))
        .childHandler(new ChannelInitializer[SocketChannel]() {
          def initChannel(ch: SocketChannel) {
            val p: ChannelPipeline = ch.pipeline
            p.addLast(new DelimiterBasedFrameDecoder(maxFrameLen, Delimiters.lineDelimiter(): _*))
            p.addLast(new StringEncoder())
            p.addLast(new StringDecoder())
            p.addLast(new TcpHandler(ctx))
          }
        })

      // Start the server.
      val f: ChannelFuture = bootWithHandler.bind(portNotInUse)
      f.syncUninterruptibly()
      currentAddr = f.channel().localAddress().asInstanceOf[InetSocketAddress]
      logger.info(s"start tcp server on address: $currentAddr")
      isRunning.set(true)
      register(currentAddr, callbackUrl)
      f.channel().closeFuture().sync()
      currentAddr
    } else {
      logger.info(s"server is running on address: $currentAddr, no need repeat start it")
      currentAddr
    }
  }