in server/src/main/java/org/apache/uniffle/server/ShuffleServer.java [193:269]
private void initialization() throws Exception {
boolean testMode = shuffleServerConf.getBoolean(RSS_TEST_MODE_ENABLE);
String storageType = shuffleServerConf.get(RSS_STORAGE_TYPE).name();
if (!testMode
&& (StorageType.LOCALFILE.name().equals(storageType)
|| (StorageType.HDFS.name()).equals(storageType))) {
throw new IllegalArgumentException(
"RSS storage type about LOCALFILE and HADOOP should be used in test mode, "
+ "because of the poor performance of these two types.");
}
ip = RssUtils.getHostIp();
if (ip == null) {
throw new RssException("Couldn't acquire host Ip");
}
grpcPort = shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT);
nettyPort = shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_PORT);
jettyServer = new JettyServer(shuffleServerConf);
registerMetrics();
// register packages and instances for jersey
jettyServer.addResourcePackages("org.apache.uniffle.common.web.resource");
jettyServer.registerInstance(
CollectorRegistry.class.getCanonicalName() + "#server",
ShuffleServerMetrics.getCollectorRegistry());
jettyServer.registerInstance(
CollectorRegistry.class.getCanonicalName() + "#grpc", grpcMetrics.getCollectorRegistry());
jettyServer.registerInstance(
CollectorRegistry.class.getCanonicalName() + "#netty", nettyMetrics.getCollectorRegistry());
jettyServer.registerInstance(
CollectorRegistry.class.getCanonicalName() + "#jvm", JvmMetrics.getCollectorRegistry());
jettyServer.registerInstance(
CollectorRegistry.class.getCanonicalName() + "#all",
new CoalescedCollectorRegistry(
ShuffleServerMetrics.getCollectorRegistry(),
grpcMetrics.getCollectorRegistry(),
nettyMetrics.getCollectorRegistry(),
JvmMetrics.getCollectorRegistry()));
SecurityConfig securityConfig = null;
if (shuffleServerConf.getBoolean(RSS_SECURITY_HADOOP_KERBEROS_ENABLE)) {
securityConfig =
SecurityConfig.newBuilder()
.krb5ConfPath(shuffleServerConf.getString(RSS_SECURITY_HADOOP_KRB5_CONF_FILE))
.keytabFilePath(shuffleServerConf.getString(RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE))
.principal(shuffleServerConf.getString(RSS_SECURITY_HADOOP_KERBEROS_PRINCIPAL))
.reloginIntervalSec(
shuffleServerConf.getLong(RSS_SECURITY_HADOOP_KERBEROS_RELOGIN_INTERVAL_SEC))
.build();
}
SecurityContextFactory.get().init(securityConfig);
storageManager = StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
storageManager.start();
boolean healthCheckEnable = shuffleServerConf.getBoolean(ShuffleServerConf.HEALTH_CHECK_ENABLE);
if (healthCheckEnable) {
List<Checker> builtInCheckers = Lists.newArrayList();
builtInCheckers.add(storageManager.getStorageChecker());
healthCheck = new HealthCheck(serverStatus, shuffleServerConf, builtInCheckers);
healthCheck.start();
}
registerHeartBeat = new RegisterHeartBeat(this);
shuffleFlushManager = new ShuffleFlushManager(shuffleServerConf, this, storageManager);
shuffleBufferManager = new ShuffleBufferManager(shuffleServerConf, shuffleFlushManager);
shuffleTaskManager =
new ShuffleTaskManager(
shuffleServerConf, shuffleFlushManager, shuffleBufferManager, storageManager);
nettyServerEnabled = shuffleServerConf.get(ShuffleServerConf.NETTY_SERVER_PORT) >= 0;
if (nettyServerEnabled) {
streamServer = new StreamServer(this);
}
setServer();
initServerTags();
}