public void run()

in src/main/java/com/uber/rss/StreamServer.java [188:249]


    public void run() throws InterruptedException, BindException {
        logger.info(String.format("Number of opened files: %s", SystemUtils.getFileDescriptorCount()));

        String serverId = getServerId();

        Supplier<ChannelHandler[]> streamHandlers = () -> new ChannelHandler[]{
            new StreamServerVersionDecoder(serverId, runningVersion, serverConfig.getIdleTimeoutMillis(),
                    shuffleExecutor, channelManager, serverDetailCollection)
        };
        ServerBootstrap streamServerBootstrap = bootstrapChannel(shuffleBossGroup, shuffleWorkerGroup,
                serverConfig.getNetworkBacklog(), serverConfig.getNetworkTimeout(), streamHandlers);

        final int httpBacklog = 32; // HTTP is only used for health check, thus good with small
                                    // value and not configurable
        Supplier<ChannelHandler[]> httpHandlers = () -> new ChannelHandler[]{
            new HttpServerCodec(),
            new HttpObjectAggregator(512 * 1024),
            new HttpChannelInboundHandler()
        };
        // increase http connection timeout for health check endpoint
        ServerBootstrap httpBootstrap = bootstrapChannel(healthCheckEventLoopGroup, healthCheckEventLoopGroup,
                httpBacklog, serverConfig.getNetworkTimeout()*3, httpHandlers);

        // Bind the ports and save the results so that the channels can be closed later. If the second bind fails,
        // the first one gets cleaned up in the shutdown.
        Pair<Channel, Integer> channelAndPort = bindPort(streamServerBootstrap, serverConfig.getShufflePort());
        channels.add(channelAndPort.getKey());
        shufflePort = channelAndPort.getValue();
        logger.info(String.format("ShuffleServer: %s:%s", hostName, shufflePort));

        if (this.serviceRegistry == null && serverConfig.getServiceRegistryType().
                equalsIgnoreCase(ServiceRegistry.TYPE_STANDALONE)) {
            logger.info(String.format("Creating registry client connecting to local stream server: %s:%s",
                    hostName, shufflePort));
            this.serviceRegistry = new StandaloneServiceRegistryClient(this.hostName, shufflePort,
                    serverConfig.getNetworkTimeout(), "streamServer");
        }
        String dataCenter = serverConfig.getDataCenterOrDefault();
        String cluster = serverConfig.getClusterOrDefault();
        String hostAndPort = String.format("%s:%s", hostName, shufflePort);
        logger.info(String.format("Registering shuffle server, data center: %s, cluster: %s, server id: %s, " +
                "host and port: %s", dataCenter, cluster, serverId, hostAndPort));
        this.serviceRegistry.registerServer(dataCenter, cluster, serverId, runningVersion, hostAndPort);

        if (serverConfig.getHttpPort() != -1) {
            channelAndPort = bindPort(httpBootstrap, serverConfig.getHttpPort());
            channels.add(channelAndPort.getKey());
            httpPort = channelAndPort.getValue();
            logger.info(String.format("HttpServer: %s:%s", hostName, httpPort));
        } else {
            httpPort = serverConfig.getHttpPort();
        }

        if (serverConfig.getStorage() instanceof ShuffleFileStorage) {
            CompletableFuture.runAsync(() -> {
                FileUtils.cleanupOldFiles(serverConfig.getRootDirectory(), System.currentTimeMillis()
                        - serverConfig.getAppFileRetentionMillis());
            });
        }

        M3Stats.getDefaultScope().counter("serverStart").inc(1);
    }