in server/src/main/java/org/apache/uniffle/server/ShuffleServer.java [229:353]
private void initialization() throws Exception {
// setup jvm pause monitor
final JvmPauseMonitor monitor = new JvmPauseMonitor(shuffleServerConf);
monitor.start();
this.jvmPauseMonitor = monitor;
boolean testMode = shuffleServerConf.getBoolean(RSS_TEST_MODE_ENABLE);
String storageType = shuffleServerConf.get(RSS_STORAGE_TYPE).name();
if (!testMode
&& (StorageType.LOCALFILE.name().equals(storageType)
|| (StorageType.HDFS.name()).equals(storageType))) {
throw new IllegalArgumentException(
"RSS storage type about LOCALFILE and HADOOP should be used in test mode, "
+ "because of the poor performance of these two types.");
}
ip = RssUtils.getHostIp();
if (ip == null) {
throw new RssException("Couldn't acquire host Ip");
}
grpcPort = shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT);
nettyPort = shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_PORT);
initServerTags();
jettyServer = new JettyServer(shuffleServerConf);
registerMetrics();
// register packages and instances for jersey
jettyServer.addResourcePackages(
"org.apache.uniffle.server.web.resource", "org.apache.uniffle.common.web.resource");
jettyServer.registerInstance(ShuffleServer.class, this);
jettyServer.registerInstance(
CollectorRegistry.class.getCanonicalName() + "#server",
ShuffleServerMetrics.getCollectorRegistry());
jettyServer.registerInstance(
CollectorRegistry.class.getCanonicalName() + "#grpc", grpcMetrics.getCollectorRegistry());
jettyServer.registerInstance(
CollectorRegistry.class.getCanonicalName() + "#netty", nettyMetrics.getCollectorRegistry());
jettyServer.registerInstance(
CollectorRegistry.class.getCanonicalName() + "#jvm", JvmMetrics.getCollectorRegistry());
jettyServer.registerInstance(
CollectorRegistry.class.getCanonicalName() + "#all",
new CoalescedCollectorRegistry(
ShuffleServerMetrics.getCollectorRegistry(),
grpcMetrics.getCollectorRegistry(),
nettyMetrics.getCollectorRegistry(),
JvmMetrics.getCollectorRegistry()));
SecurityConfig securityConfig = null;
if (shuffleServerConf.getBoolean(RSS_SECURITY_HADOOP_KERBEROS_ENABLE)) {
securityConfig =
SecurityConfig.newBuilder()
.krb5ConfPath(shuffleServerConf.getString(RSS_SECURITY_HADOOP_KRB5_CONF_FILE))
.keytabFilePath(shuffleServerConf.getString(RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE))
.principal(shuffleServerConf.getString(RSS_SECURITY_HADOOP_KERBEROS_PRINCIPAL))
.reloginIntervalSec(
shuffleServerConf.getLong(RSS_SECURITY_HADOOP_KERBEROS_RELOGIN_INTERVAL_SEC))
.enableProxyUser(
shuffleServerConf.getBoolean(RSS_SECURITY_HADOOP_KERBEROS_PROXY_USER_ENABLE))
.build();
}
SecurityContextFactory.get().init(securityConfig);
storageManager = StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
storageManager.start();
boolean healthCheckEnable = shuffleServerConf.getBoolean(ShuffleServerConf.HEALTH_CHECK_ENABLE);
if (healthCheckEnable) {
List<Checker> builtInCheckers = Lists.newArrayList();
builtInCheckers.add(storageManager.getStorageChecker());
healthCheck = new HealthCheck(serverStatus, shuffleServerConf, builtInCheckers);
healthCheck.start();
}
nettyServerEnabled =
shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE) == ServerType.GRPC_NETTY;
if (nettyServerEnabled) {
if (nettyPort < 0) {
throw new RssException(
String.format(
"%s must be set during startup when using GRPC_NETTY",
ShuffleServerConf.NETTY_SERVER_PORT.key()));
}
streamServer = new StreamServer(this);
}
registerHeartBeat = new RegisterHeartBeat(this);
shuffleFlushManager = new ShuffleFlushManager(shuffleServerConf, this, storageManager);
shuffleBufferManager =
new ShuffleBufferManager(shuffleServerConf, shuffleFlushManager, nettyServerEnabled);
remoteMergeEnable = shuffleServerConf.get(ShuffleServerConf.SERVER_MERGE_ENABLE);
if (remoteMergeEnable) {
if (shuffleBufferManager.getShuffleBufferType() != ShuffleBufferType.SKIP_LIST) {
throw new RssException(
"Shuffle buffer type must be SKIP_LIST when remote merge is enable!");
}
shuffleMergeManager = new ShuffleMergeManager(shuffleServerConf, this);
}
shuffleTaskManager =
new ShuffleTaskManager(
shuffleServerConf,
shuffleFlushManager,
shuffleBufferManager,
storageManager,
shuffleMergeManager);
shuffleTaskManager.start();
ShuffleServerMetrics.addLabeledGauge(
USED_DIRECT_MEMORY_SIZE_BY_NETTY, PlatformDependent::usedDirectMemory);
ShuffleServerMetrics.addLabeledGauge(
USED_DIRECT_MEMORY_SIZE_BY_GRPC_NETTY,
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent::usedDirectMemory);
ShuffleServerMetrics.addLabeledGauge(
USED_DIRECT_MEMORY_SIZE,
() ->
(PlatformDependent.usedDirectMemory()
+ io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent
.usedDirectMemory()));
ShuffleServerMetrics.addLabeledGauge(
JVM_PAUSE_TOTAL_EXTRA_TIME, jvmPauseMonitor::getTotalGcExtraSleepTime);
ShuffleServerMetrics.addLabeledGauge(
JVM_PAUSE_INFO_TIME_EXCEEDED, jvmPauseMonitor::getNumGcInfoThresholdExceeded);
ShuffleServerMetrics.addLabeledGauge(
JVM_PAUSE_WARN_TIME_EXCEEDED, jvmPauseMonitor::getNumGcWarnThresholdExceeded);
setServer();
}