public RssShuffleManagerBase()

in client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java [178:343]


  public RssShuffleManagerBase(SparkConf conf, boolean isDriver) {
    LOG.info(
        "Uniffle {} version: {}", this.getClass().getName(), Constants.VERSION_AND_REVISION_SHORT);
    this.sparkConf = conf;
    checkSupported(sparkConf);
    boolean supportsRelocation =
        Optional.ofNullable(SparkEnv.get())
            .map(env -> env.serializer().supportsRelocationOfSerializedObjects())
            .orElse(true);
    if (!supportsRelocation) {
      LOG.warn(
          "RSSShuffleManager requires a serializer which supports relocations of serialized object. Please set "
              + "spark.serializer to org.apache.spark.serializer.KryoSerializer instead");
    }
    this.user = sparkConf.get("spark.rss.quota.user", "user");
    this.uuid = sparkConf.get("spark.rss.quota.uuid", Long.toString(System.currentTimeMillis()));
    this.dynamicConfEnabled = sparkConf.get(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED);

    // fetch client conf and apply them if necessary
    if (isDriver && this.dynamicConfEnabled) {
      fetchAndApplyDynamicConf(sparkConf);
    }
    RssSparkShuffleUtils.validateRssClientConf(sparkConf);

    // convert spark conf to rss conf after fetching dynamic client conf
    this.rssConf = RssSparkConfig.toRssConf(sparkConf);
    RssUtils.setExtraJavaProperties(rssConf);

    // set & check replica config
    this.dataReplica = sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA);
    this.dataReplicaWrite = sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_WRITE);
    this.dataReplicaRead = sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_READ);
    this.dataReplicaSkipEnabled = sparkConf.get(RssSparkConfig.RSS_DATA_REPLICA_SKIP_ENABLED);
    LOG.info(
        "Check quorum config ["
            + dataReplica
            + ":"
            + dataReplicaWrite
            + ":"
            + dataReplicaRead
            + ":"
            + dataReplicaSkipEnabled
            + "]");
    RssUtils.checkQuorumSetting(dataReplica, dataReplicaWrite, dataReplicaRead);

    this.maxConcurrencyPerPartitionToWrite = rssConf.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE);

    this.clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);

    // configure block id layout
    this.maxFailures = sparkConf.getInt("spark.task.maxFailures", 4);
    this.speculation = sparkConf.getBoolean("spark.speculation", false);
    // configureBlockIdLayout requires maxFailures and speculation to be initialized
    configureBlockIdLayout(sparkConf, rssConf);
    this.blockIdLayout = BlockIdLayout.from(rssConf);

    this.dataTransferPoolSize = sparkConf.get(RssSparkConfig.RSS_DATA_TRANSFER_POOL_SIZE);
    this.dataCommitPoolSize = sparkConf.get(RssSparkConfig.RSS_DATA_COMMIT_POOL_SIZE);

    // External shuffle service is not supported when using remote shuffle service
    sparkConf.set("spark.shuffle.service.enabled", "false");
    sparkConf.set("spark.dynamicAllocation.shuffleTracking.enabled", "false");
    sparkConf.set(RssSparkConfig.RSS_ENABLED.key(), "true");
    LOG.info("Disable external shuffle service in RssShuffleManager.");
    sparkConf.set("spark.sql.adaptive.localShuffleReader.enabled", "false");
    LOG.info("Disable local shuffle reader in RssShuffleManager.");
    // If we store shuffle data in distributed filesystem or in a disaggregated
    // shuffle cluster, we don't need shuffle data locality
    sparkConf.set("spark.shuffle.reduceLocality.enabled", "false");
    LOG.info("Disable shuffle data locality in RssShuffleManager.");

    taskToSuccessBlockIds = JavaUtils.newConcurrentMap();
    taskToFailedBlockSendTracker = JavaUtils.newConcurrentMap();
    this.shuffleIdToPartitionNum = JavaUtils.newConcurrentMap();
    this.shuffleIdToNumMapTasks = JavaUtils.newConcurrentMap();

    // stage retry for write/fetch failure
    rssStageRetryForFetchFailureEnabled =
        rssConf.get(RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED);
    rssStageRetryForWriteFailureEnabled =
        rssConf.get(RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED);
    if (rssStageRetryForFetchFailureEnabled || rssStageRetryForWriteFailureEnabled) {
      rssStageRetryEnabled = true;
      List<String> logTips = new ArrayList<>();
      if (rssStageRetryForWriteFailureEnabled) {
        logTips.add("write");
      }
      if (rssStageRetryForWriteFailureEnabled) {
        logTips.add("fetch");
      }
      LOG.info(
          "Activate the stage retry mechanism that will resubmit stage on {} failure",
          StringUtils.join(logTips, "/"));
    }

    this.partitionReassignEnabled = rssConf.get(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
    // The feature of partition reassign is exclusive with multiple replicas and stage retry.
    if (partitionReassignEnabled) {
      if (dataReplica > 1) {
        throw new RssException(
            "The feature of task partition reassign is incompatible with multiple replicas mechanism.");
      }
      this.partitionSplitMode = rssConf.get(RssClientConf.RSS_CLIENT_PARTITION_SPLIT_MODE);
      this.partitionSplitLoadBalanceServerNum =
          rssConf.get(RssClientConf.RSS_CLIENT_PARTITION_SPLIT_LOAD_BALANCE_SERVER_NUMBER);
      LOG.info("Partition reassign is enabled.");
    }
    this.blockIdSelfManagedEnabled = rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED);
    this.shuffleManagerRpcServiceEnabled =
        partitionReassignEnabled
            || rssStageRetryEnabled
            || blockIdSelfManagedEnabled
            || isSparkUIEnabled(conf);

    if (isDriver) {
      heartBeatScheduledExecutorService =
          ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
      if (shuffleManagerRpcServiceEnabled) {
        LOG.info("stage resubmit is supported and enabled");
        // start shuffle manager server
        rssConf.set(RPC_SERVER_PORT, 0);
        ShuffleManagerServerFactory factory = new ShuffleManagerServerFactory(this, rssConf);
        service = factory.getService();
        shuffleManagerServer = factory.getServer(service);
        try {
          shuffleManagerServer.start();
          // pass this as a spark.rss.shuffle.manager.grpc.port config, so it can be propagated to
          // executor properly.
          sparkConf.set(
              RssSparkConfig.RSS_SHUFFLE_MANAGER_GRPC_PORT, shuffleManagerServer.getPort());
        } catch (Exception e) {
          LOG.error("Failed to start shuffle manager server", e);
          throw new RssException(e);
        }
      }
    }
    if (shuffleManagerRpcServiceEnabled) {
      getOrCreateShuffleManagerClientSupplier();
    }

    // Start heartbeat thread.
    this.heartbeatInterval = sparkConf.get(RssSparkConfig.RSS_HEARTBEAT_INTERVAL);
    this.heartbeatTimeout =
        sparkConf.getLong(RssSparkConfig.RSS_HEARTBEAT_TIMEOUT.key(), heartbeatInterval / 2);
    heartBeatScheduledExecutorService =
        ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");

    this.shuffleWriteClient = createShuffleWriteClient();
    registerCoordinator();

    LOG.info("Rss data pusher is starting...");
    int poolSize = sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_SIZE);
    int keepAliveTime = sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_KEEPALIVE);
    this.dataPusher =
        new DataPusher(
            shuffleWriteClient,
            taskToSuccessBlockIds,
            taskToFailedBlockSendTracker,
            failedTaskIds,
            poolSize,
            keepAliveTime);
    this.partitionReassignMaxServerNum =
        rssConf.get(RSS_PARTITION_REASSIGN_MAX_REASSIGNMENT_SERVER_NUM);
    this.shuffleHandleInfoManager = new ShuffleHandleInfoManager();
    this.rssStageResubmitManager = new RssStageResubmitManager();
  }