in src/main/java/com/uber/rss/StreamServer.java [188:249]
public void run() throws InterruptedException, BindException {
logger.info(String.format("Number of opened files: %s", SystemUtils.getFileDescriptorCount()));
String serverId = getServerId();
Supplier<ChannelHandler[]> streamHandlers = () -> new ChannelHandler[]{
new StreamServerVersionDecoder(serverId, runningVersion, serverConfig.getIdleTimeoutMillis(),
shuffleExecutor, channelManager, serverDetailCollection)
};
ServerBootstrap streamServerBootstrap = bootstrapChannel(shuffleBossGroup, shuffleWorkerGroup,
serverConfig.getNetworkBacklog(), serverConfig.getNetworkTimeout(), streamHandlers);
final int httpBacklog = 32; // HTTP is only used for health check, thus good with small
// value and not configurable
Supplier<ChannelHandler[]> httpHandlers = () -> new ChannelHandler[]{
new HttpServerCodec(),
new HttpObjectAggregator(512 * 1024),
new HttpChannelInboundHandler()
};
// increase http connection timeout for health check endpoint
ServerBootstrap httpBootstrap = bootstrapChannel(healthCheckEventLoopGroup, healthCheckEventLoopGroup,
httpBacklog, serverConfig.getNetworkTimeout()*3, httpHandlers);
// Bind the ports and save the results so that the channels can be closed later. If the second bind fails,
// the first one gets cleaned up in the shutdown.
Pair<Channel, Integer> channelAndPort = bindPort(streamServerBootstrap, serverConfig.getShufflePort());
channels.add(channelAndPort.getKey());
shufflePort = channelAndPort.getValue();
logger.info(String.format("ShuffleServer: %s:%s", hostName, shufflePort));
if (this.serviceRegistry == null && serverConfig.getServiceRegistryType().
equalsIgnoreCase(ServiceRegistry.TYPE_STANDALONE)) {
logger.info(String.format("Creating registry client connecting to local stream server: %s:%s",
hostName, shufflePort));
this.serviceRegistry = new StandaloneServiceRegistryClient(this.hostName, shufflePort,
serverConfig.getNetworkTimeout(), "streamServer");
}
String dataCenter = serverConfig.getDataCenterOrDefault();
String cluster = serverConfig.getClusterOrDefault();
String hostAndPort = String.format("%s:%s", hostName, shufflePort);
logger.info(String.format("Registering shuffle server, data center: %s, cluster: %s, server id: %s, " +
"host and port: %s", dataCenter, cluster, serverId, hostAndPort));
this.serviceRegistry.registerServer(dataCenter, cluster, serverId, runningVersion, hostAndPort);
if (serverConfig.getHttpPort() != -1) {
channelAndPort = bindPort(httpBootstrap, serverConfig.getHttpPort());
channels.add(channelAndPort.getKey());
httpPort = channelAndPort.getValue();
logger.info(String.format("HttpServer: %s:%s", hostName, httpPort));
} else {
httpPort = serverConfig.getHttpPort();
}
if (serverConfig.getStorage() instanceof ShuffleFileStorage) {
CompletableFuture.runAsync(() -> {
FileUtils.cleanupOldFiles(serverConfig.getRootDirectory(), System.currentTimeMillis()
- serverConfig.getAppFileRetentionMillis());
});
}
M3Stats.getDefaultScope().counter("serverStart").inc(1);
}