in flink-connector-netty/src/main/scala/org/apache/flink/streaming/connectors/netty/example/TcpServer.scala [62:102]
def startNettyServer(
portNotInUse: Int,
callbackUrl: Option[String]
): InetSocketAddress = synchronized {
if (!isRunning.get()) {
val server = new ServerBootstrap
val bootstrap: ServerBootstrap = server
.group(bossGroup, workerGroup)
.channel(classOf[NioServerSocketChannel])
.option[java.lang.Boolean](ChannelOption.TCP_NODELAY, true)
.option[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true)
tcpOpts(bootstrap)
val bootWithHandler = bootstrap
.handler(new LoggingHandler(logLevel))
.childHandler(new ChannelInitializer[SocketChannel]() {
def initChannel(ch: SocketChannel) {
val p: ChannelPipeline = ch.pipeline
p.addLast(new DelimiterBasedFrameDecoder(maxFrameLen, Delimiters.lineDelimiter(): _*))
p.addLast(new StringEncoder())
p.addLast(new StringDecoder())
p.addLast(new TcpHandler(ctx))
}
})
// Start the server.
val f: ChannelFuture = bootWithHandler.bind(portNotInUse)
f.syncUninterruptibly()
currentAddr = f.channel().localAddress().asInstanceOf[InetSocketAddress]
logger.info(s"start tcp server on address: $currentAddr")
isRunning.set(true)
register(currentAddr, callbackUrl)
f.channel().closeFuture().sync()
currentAddr
} else {
logger.info(s"server is running on address: $currentAddr, no need repeat start it")
currentAddr
}
}