private void initialization()

in server/src/main/java/org/apache/uniffle/server/ShuffleServer.java [229:353]


  private void initialization() throws Exception {
    // setup jvm pause monitor
    final JvmPauseMonitor monitor = new JvmPauseMonitor(shuffleServerConf);
    monitor.start();
    this.jvmPauseMonitor = monitor;

    boolean testMode = shuffleServerConf.getBoolean(RSS_TEST_MODE_ENABLE);
    String storageType = shuffleServerConf.get(RSS_STORAGE_TYPE).name();
    if (!testMode
        && (StorageType.LOCALFILE.name().equals(storageType)
            || (StorageType.HDFS.name()).equals(storageType))) {
      throw new IllegalArgumentException(
          "RSS storage type about LOCALFILE and HADOOP should be used in test mode, "
              + "because of the poor performance of these two types.");
    }
    ip = RssUtils.getHostIp();
    if (ip == null) {
      throw new RssException("Couldn't acquire host Ip");
    }
    grpcPort = shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT);
    nettyPort = shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_PORT);

    initServerTags();

    jettyServer = new JettyServer(shuffleServerConf);
    registerMetrics();
    // register packages and instances for jersey
    jettyServer.addResourcePackages(
        "org.apache.uniffle.server.web.resource", "org.apache.uniffle.common.web.resource");
    jettyServer.registerInstance(ShuffleServer.class, this);
    jettyServer.registerInstance(
        CollectorRegistry.class.getCanonicalName() + "#server",
        ShuffleServerMetrics.getCollectorRegistry());
    jettyServer.registerInstance(
        CollectorRegistry.class.getCanonicalName() + "#grpc", grpcMetrics.getCollectorRegistry());
    jettyServer.registerInstance(
        CollectorRegistry.class.getCanonicalName() + "#netty", nettyMetrics.getCollectorRegistry());
    jettyServer.registerInstance(
        CollectorRegistry.class.getCanonicalName() + "#jvm", JvmMetrics.getCollectorRegistry());
    jettyServer.registerInstance(
        CollectorRegistry.class.getCanonicalName() + "#all",
        new CoalescedCollectorRegistry(
            ShuffleServerMetrics.getCollectorRegistry(),
            grpcMetrics.getCollectorRegistry(),
            nettyMetrics.getCollectorRegistry(),
            JvmMetrics.getCollectorRegistry()));

    SecurityConfig securityConfig = null;
    if (shuffleServerConf.getBoolean(RSS_SECURITY_HADOOP_KERBEROS_ENABLE)) {
      securityConfig =
          SecurityConfig.newBuilder()
              .krb5ConfPath(shuffleServerConf.getString(RSS_SECURITY_HADOOP_KRB5_CONF_FILE))
              .keytabFilePath(shuffleServerConf.getString(RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE))
              .principal(shuffleServerConf.getString(RSS_SECURITY_HADOOP_KERBEROS_PRINCIPAL))
              .reloginIntervalSec(
                  shuffleServerConf.getLong(RSS_SECURITY_HADOOP_KERBEROS_RELOGIN_INTERVAL_SEC))
              .enableProxyUser(
                  shuffleServerConf.getBoolean(RSS_SECURITY_HADOOP_KERBEROS_PROXY_USER_ENABLE))
              .build();
    }
    SecurityContextFactory.get().init(securityConfig);

    storageManager = StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
    storageManager.start();

    boolean healthCheckEnable = shuffleServerConf.getBoolean(ShuffleServerConf.HEALTH_CHECK_ENABLE);
    if (healthCheckEnable) {
      List<Checker> builtInCheckers = Lists.newArrayList();
      builtInCheckers.add(storageManager.getStorageChecker());
      healthCheck = new HealthCheck(serverStatus, shuffleServerConf, builtInCheckers);
      healthCheck.start();
    }

    nettyServerEnabled =
        shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE) == ServerType.GRPC_NETTY;
    if (nettyServerEnabled) {
      if (nettyPort < 0) {
        throw new RssException(
            String.format(
                "%s must be set during startup when using GRPC_NETTY",
                ShuffleServerConf.NETTY_SERVER_PORT.key()));
      }
      streamServer = new StreamServer(this);
    }

    registerHeartBeat = new RegisterHeartBeat(this);
    shuffleFlushManager = new ShuffleFlushManager(shuffleServerConf, this, storageManager);
    shuffleBufferManager =
        new ShuffleBufferManager(shuffleServerConf, shuffleFlushManager, nettyServerEnabled);
    remoteMergeEnable = shuffleServerConf.get(ShuffleServerConf.SERVER_MERGE_ENABLE);
    if (remoteMergeEnable) {
      if (shuffleBufferManager.getShuffleBufferType() != ShuffleBufferType.SKIP_LIST) {
        throw new RssException(
            "Shuffle buffer type must be SKIP_LIST when remote merge is enable!");
      }
      shuffleMergeManager = new ShuffleMergeManager(shuffleServerConf, this);
    }
    shuffleTaskManager =
        new ShuffleTaskManager(
            shuffleServerConf,
            shuffleFlushManager,
            shuffleBufferManager,
            storageManager,
            shuffleMergeManager);
    shuffleTaskManager.start();
    ShuffleServerMetrics.addLabeledGauge(
        USED_DIRECT_MEMORY_SIZE_BY_NETTY, PlatformDependent::usedDirectMemory);
    ShuffleServerMetrics.addLabeledGauge(
        USED_DIRECT_MEMORY_SIZE_BY_GRPC_NETTY,
        io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent::usedDirectMemory);
    ShuffleServerMetrics.addLabeledGauge(
        USED_DIRECT_MEMORY_SIZE,
        () ->
            (PlatformDependent.usedDirectMemory()
                + io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent
                    .usedDirectMemory()));
    ShuffleServerMetrics.addLabeledGauge(
        JVM_PAUSE_TOTAL_EXTRA_TIME, jvmPauseMonitor::getTotalGcExtraSleepTime);
    ShuffleServerMetrics.addLabeledGauge(
        JVM_PAUSE_INFO_TIME_EXCEEDED, jvmPauseMonitor::getNumGcInfoThresholdExceeded);
    ShuffleServerMetrics.addLabeledGauge(
        JVM_PAUSE_WARN_TIME_EXCEEDED, jvmPauseMonitor::getNumGcWarnThresholdExceeded);

    setServer();
  }