public static Single bind()

in servicetalk-tcp-netty-internal/src/main/java/io/servicetalk/tcp/netty/internal/TcpServerBinder.java [86:170]


    public static <CC extends ConnectionContext> Single<ServerContext> bind(SocketAddress listenAddress,
            final ReadOnlyTcpServerConfig config, final boolean autoRead, final ExecutionContext<?> executionContext,
            @Nullable final InfluencerConnectionAcceptor connectionAcceptor,
            final BiFunction<Channel, ConnectionObserver, Single<CC>> connectionFunction,
            final Consumer<CC> connectionConsumer) {
        requireNonNull(connectionFunction);
        requireNonNull(connectionConsumer);
        listenAddress = toNettyAddress(listenAddress);
        EventLoopAwareNettyIoExecutor nettyIoExecutor = toEventLoopAwareNettyIoExecutor(executionContext.ioExecutor());
        ServerBootstrap bs = new ServerBootstrap();
        configure(config, autoRead, bs, nettyIoExecutor.eventLoopGroup(), listenAddress.getClass());

        ChannelSet channelSet = new ChannelSet(
                executionContext.executionStrategy().isCloseOffloaded() ? executionContext.executor() : immediate());
        bs.handler(new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
                // Verify that we do not leak pooled memory in the "accept" pipeline
                if (msg instanceof ReferenceCounted) {
                    try {
                        throw new IllegalArgumentException("Unexpected ReferenceCounted msg in 'accept' pipeline: " +
                                msg);
                    } finally {
                        ((ReferenceCounted) msg).release();
                    }
                }
                if (msg instanceof Channel) {
                    final Channel channel = (Channel) msg;
                    if (!channel.isActive()) {
                        channel.close();
                        LOGGER.debug("Channel ({}) is accepted, but was already inactive", msg);
                        return;
                    } else if (!channelSet.addIfAbsent(channel)) {
                        LOGGER.warn("Channel ({}) not added to ChannelSet", msg);
                        return;
                    }
                }
                ctx.fireChannelRead(msg);
            }
        });
        bs.childHandler(new io.netty.channel.ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel channel) {
                Single<CC> connectionSingle = connectionFunction.apply(channel,
                        config.transportObserver().onNewConnection(channel.localAddress(), channel.remoteAddress()));
                if (connectionAcceptor != null) {
                    connectionSingle = connectionSingle.flatMap(conn ->
                            // Defer is required to isolate context for ConnectionAcceptor#accept and the rest
                            // of connection processing.
                            defer(() -> connectionAcceptor.accept(conn).concat(succeeded(conn)))
                                    // subscribeOn is required to offload calls to connectionAcceptor#accept
                                    .subscribeOn(connectionAcceptor.requiredOffloads().isConnectOffloaded() ?
                                        executionContext.executor() : immediate())
                    );
                }
                connectionSingle.beforeOnError(cause -> {
                    // Getting the remote-address may involve volatile reads and potentially a syscall, so guard it.
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Failed to create a connection for remote address {}", channel.remoteAddress(),
                                cause);
                    }
                    close(channel, cause);
                }).subscribe(connectionConsumer);
            }
        });

        ChannelFuture future = bs.bind(listenAddress);
        return new SubscribableSingle<ServerContext>() {
            @Override
            protected void handleSubscribe(Subscriber<? super ServerContext> subscriber) {
                subscriber.onSubscribe(() -> future.cancel(true));
                future.addListener((ChannelFuture f) -> {
                    Channel channel = f.channel();
                    Throwable cause = f.cause();
                    if (cause == null) {
                        subscriber.onSuccess(NettyServerContext.wrap(channel, channelSet,
                                connectionAcceptor, executionContext));
                    } else {
                        close(channel, f.cause());
                        subscriber.onError(f.cause());
                    }
                });
            }
        };
    }