client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java [113:175]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private boolean tryAccessCluster() {
    String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), "").trim();
    if (StringUtils.isEmpty(accessId)) {
      LOG.warn("Access id key is empty");
      return false;
    }
    long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS);
    int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES);

    int assignmentShuffleNodesNum =
        sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER);
    Map<String, String> extraProperties = Maps.newHashMap();
    extraProperties.put(
        ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, String.valueOf(assignmentShuffleNodesNum));

    for (CoordinatorClient coordinatorClient : coordinatorClients) {
      Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
      boolean canAccess;
      try {
        canAccess =
            RetryUtils.retry(
                () -> {
                  RssAccessClusterResponse response =
                      coordinatorClient.accessCluster(
                          new RssAccessClusterRequest(
                              accessId, assignmentTags, accessTimeoutMs, extraProperties, user));
                  if (response.getStatusCode() == StatusCode.SUCCESS) {
                    LOG.warn(
                        "Success to access cluster {} using {}",
                        coordinatorClient.getDesc(),
                        accessId);
                    uuid = response.getUuid();
                    return true;
                  } else if (response.getStatusCode() == StatusCode.ACCESS_DENIED) {
                    throw new RssException(
                        "Request to access cluster "
                            + coordinatorClient.getDesc()
                            + " is denied using "
                            + accessId
                            + " for "
                            + response.getMessage());
                  } else {
                    throw new RssException(
                        "Fail to reach cluster "
                            + coordinatorClient.getDesc()
                            + " for "
                            + response.getMessage());
                  }
                },
                retryInterval,
                retryTimes);
        return canAccess;
      } catch (Throwable e) {
        LOG.warn(
            "Fail to access cluster {} using {} for {}",
            coordinatorClient.getDesc(),
            accessId,
            e.getMessage());
      }
    }

    return false;
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java [113:175]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private boolean tryAccessCluster() {
    String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), "").trim();
    if (StringUtils.isEmpty(accessId)) {
      LOG.warn("Access id key is empty");
      return false;
    }
    long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS);
    int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES);

    int assignmentShuffleNodesNum =
        sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER);
    Map<String, String> extraProperties = Maps.newHashMap();
    extraProperties.put(
        ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, String.valueOf(assignmentShuffleNodesNum));

    for (CoordinatorClient coordinatorClient : coordinatorClients) {
      Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
      boolean canAccess;
      try {
        canAccess =
            RetryUtils.retry(
                () -> {
                  RssAccessClusterResponse response =
                      coordinatorClient.accessCluster(
                          new RssAccessClusterRequest(
                              accessId, assignmentTags, accessTimeoutMs, extraProperties, user));
                  if (response.getStatusCode() == StatusCode.SUCCESS) {
                    LOG.warn(
                        "Success to access cluster {} using {}",
                        coordinatorClient.getDesc(),
                        accessId);
                    uuid = response.getUuid();
                    return true;
                  } else if (response.getStatusCode() == StatusCode.ACCESS_DENIED) {
                    throw new RssException(
                        "Request to access cluster "
                            + coordinatorClient.getDesc()
                            + " is denied using "
                            + accessId
                            + " for "
                            + response.getMessage());
                  } else {
                    throw new RssException(
                        "Fail to reach cluster "
                            + coordinatorClient.getDesc()
                            + " for "
                            + response.getMessage());
                  }
                },
                retryInterval,
                retryTimes);
        return canAccess;
      } catch (Throwable e) {
        LOG.warn(
            "Fail to access cluster {} using {} for {}",
            coordinatorClient.getDesc(),
            accessId,
            e.getMessage());
      }
    }

    return false;
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



