in servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerBinder.java [86:170]
public static <CC extends ConnectionContext> Single<ServerContext> bind(SocketAddress listenAddress,
final ReadOnlyTcpServerConfig config, final boolean autoRead, final ExecutionContext<?> executionContext,
@Nullable final InfluencerConnectionAcceptor connectionAcceptor,
final BiFunction<Channel, ConnectionObserver, Single<CC>> connectionFunction,
final Consumer<CC> connectionConsumer) {
requireNonNull(connectionFunction);
requireNonNull(connectionConsumer);
listenAddress = toNettyAddress(listenAddress);
EventLoopAwareNettyIoExecutor nettyIoExecutor = toEventLoopAwareNettyIoExecutor(executionContext.ioExecutor());
ServerBootstrap bs = new ServerBootstrap();
configure(config, autoRead, bs, nettyIoExecutor.eventLoopGroup(), listenAddress.getClass());
ChannelSet channelSet = new ChannelSet(
executionContext.executionStrategy().isCloseOffloaded() ? executionContext.executor() : immediate());
bs.handler(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
// Verify that we do not leak pooled memory in the "accept" pipeline
if (msg instanceof ReferenceCounted) {
try {
throw new IllegalArgumentException("Unexpected ReferenceCounted msg in 'accept' pipeline: " +
msg);
} finally {
((ReferenceCounted) msg).release();
}
}
if (msg instanceof Channel) {
final Channel channel = (Channel) msg;
if (!channel.isActive()) {
channel.close();
LOGGER.debug("Channel ({}) is accepted, but was already inactive", msg);
return;
} else if (!channelSet.addIfAbsent(channel)) {
LOGGER.warn("Channel ({}) not added to ChannelSet", msg);
return;
}
}
ctx.fireChannelRead(msg);
}
});
bs.childHandler(new io.netty.channel.ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) {
Single<CC> connectionSingle = connectionFunction.apply(channel,
config.transportObserver().onNewConnection(channel.localAddress(), channel.remoteAddress()));
if (connectionAcceptor != null) {
connectionSingle = connectionSingle.flatMap(conn ->
// Defer is required to isolate context for ConnectionAcceptor#accept and the rest
// of connection processing.
defer(() -> connectionAcceptor.accept(conn).concat(succeeded(conn)))
// subscribeOn is required to offload calls to connectionAcceptor#accept
.subscribeOn(connectionAcceptor.requiredOffloads().isConnectOffloaded() ?
executionContext.executor() : immediate())
);
}
connectionSingle.beforeOnError(cause -> {
// Getting the remote-address may involve volatile reads and potentially a syscall, so guard it.
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Failed to create a connection for remote address {}", channel.remoteAddress(),
cause);
}
close(channel, cause);
}).subscribe(connectionConsumer);
}
});
ChannelFuture future = bs.bind(listenAddress);
return new SubscribableSingle<ServerContext>() {
@Override
protected void handleSubscribe(Subscriber<? super ServerContext> subscriber) {
subscriber.onSubscribe(() -> future.cancel(true));
future.addListener((ChannelFuture f) -> {
Channel channel = f.channel();
Throwable cause = f.cause();
if (cause == null) {
subscriber.onSuccess(NettyServerContext.wrap(channel, channelSet,
connectionAcceptor, executionContext));
} else {
close(channel, f.cause());
subscriber.onError(f.cause());
}
});
}
};
}