public RssShuffleManager()

in client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java [108:218]


  public RssShuffleManager(SparkConf sparkConf, boolean isDriver) {
    if (sparkConf.getBoolean("spark.sql.adaptive.enabled", false)) {
      throw new IllegalArgumentException(
          "Spark2 doesn't support AQE, spark.sql.adaptive.enabled should be false.");
    }
    this.sparkConf = sparkConf;
    this.user = sparkConf.get("spark.rss.quota.user", "user");
    this.uuid = sparkConf.get("spark.rss.quota.uuid", Long.toString(System.currentTimeMillis()));
    // set & check replica config
    this.dataReplica = sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA);
    this.dataReplicaWrite = sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_WRITE);
    this.dataReplicaRead = sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_READ);
    this.dataTransferPoolSize = sparkConf.get(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE);
    this.dataReplicaSkipEnabled = sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED);
    this.maxConcurrencyPerPartitionToWrite =
        RssSparkConfig.toRssConf(sparkConf).get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE);
    LOG.info(
        "Check quorum config ["
            + dataReplica
            + ":"
            + dataReplicaWrite
            + ":"
            + dataReplicaRead
            + ":"
            + dataReplicaSkipEnabled
            + "]");
    RssUtils.checkQuorumSetting(dataReplica, dataReplicaWrite, dataReplicaRead);

    this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
    this.heartbeatInterval = sparkConf.get(RssSparkConfig.RSS_HEARTBEAT_INTERVAL);
    this.heartbeatTimeout =
        sparkConf.getLong(RssSparkConfig.RSS_HEARTBEAT_TIMEOUT.key(), heartbeatInterval / 2);
    this.dynamicConfEnabled = sparkConf.get(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED);
    int retryMax = sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX);
    long retryIntervalMax = sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX);
    int heartBeatThreadNum = sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM);
    this.dataCommitPoolSize = sparkConf.get(RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE);
    int unregisterThreadPoolSize =
        sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_THREAD_POOL_SIZE);
    int unregisterRequestTimeoutSec =
        sparkConf.get(RssSparkConfig.RSS_CLIENT_UNREGISTER_REQUEST_TIMEOUT_SEC);
    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
    this.shuffleWriteClient =
        ShuffleClientFactory.getInstance()
            .createShuffleWriteClient(
                clientType,
                retryMax,
                retryIntervalMax,
                heartBeatThreadNum,
                dataReplica,
                dataReplicaWrite,
                dataReplicaRead,
                dataReplicaSkipEnabled,
                dataTransferPoolSize,
                dataCommitPoolSize,
                unregisterThreadPoolSize,
                unregisterRequestTimeoutSec,
                rssConf);
    registerCoordinator();
    // fetch client conf and apply them if necessary and disable ESS
    if (isDriver && dynamicConfEnabled) {
      Map<String, String> clusterClientConf =
          shuffleWriteClient.fetchClientConf(sparkConf.get(RssSparkConfig.RSS_ACCESS_TIMEOUT_MS));
      RssSparkShuffleUtils.applyDynamicClientConf(sparkConf, clusterClientConf);
    }
    RssSparkShuffleUtils.validateRssClientConf(sparkConf);
    // External shuffle service is not supported when using remote shuffle service
    sparkConf.set("spark.shuffle.service.enabled", "false");
    LOG.info("Disable external shuffle service in RssShuffleManager.");
    // If we store shuffle data in distributed filesystem or in a disaggregated
    // shuffle cluster, we don't need shuffle data locality
    sparkConf.set("spark.shuffle.reduceLocality.enabled", "false");
    LOG.info("Disable shuffle data locality in RssShuffleManager.");
    if (!sparkConf.getBoolean(RssSparkConfig.RSS_TEST_FLAG.key(), false)) {
      if (isDriver) {
        heartBeatScheduledExecutorService =
            ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
        if (sparkConf.get(RssSparkConfig.RSS_RESUBMIT_STAGE)
            && RssSparkShuffleUtils.isStageResubmitSupported()) {
          LOG.info("stage resubmit is supported and enabled");
          // start shuffle manager server
          rssConf.set(RPC_SERVER_PORT, 0);
          ShuffleManagerServerFactory factory = new ShuffleManagerServerFactory(this, rssConf);
          service = factory.getService();
          shuffleManagerServer = factory.getServer(service);
          try {
            shuffleManagerServer.start();
            // pass this as a spark.rss.shuffle.manager.grpc.port config, so it can be propagated to
            // executor properly.
            sparkConf.set(
                RssSparkConfig.RSS_SHUFFLE_MANAGER_GRPC_PORT, shuffleManagerServer.getPort());
          } catch (Exception e) {
            LOG.error("Failed to start shuffle manager server", e);
            throw new RssException(e);
          }
        }
      }
      // for non-driver executor, start a thread for sending shuffle data to shuffle server
      LOG.info("RSS data pusher is starting...");
      int poolSize = sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_SIZE);
      int keepAliveTime = sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_KEEPALIVE);
      this.dataPusher =
          new DataPusher(
              shuffleWriteClient,
              taskToSuccessBlockIds,
              taskToFailedBlockIds,
              failedTaskIds,
              poolSize,
              keepAliveTime);
    }
  }