client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java [119:176]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private boolean tryAccessCluster(CoordinatorClient coordinatorClient) {
    String accessId = DelegationRssShuffleManagerUtils.acquireAccessId(sparkConf);
    if (accessId == null) {
      LOG.warn("Access id key is null");
      return false;
    }
    long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS);
    int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES);

    int assignmentShuffleNodesNum =
        sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER);
    Map<String, String> extraProperties = Maps.newHashMap();
    extraProperties.put(
        ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, String.valueOf(assignmentShuffleNodesNum));

    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
    List<String> excludeProperties =
        rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES);
    List<String> includeProperties =
        rssConf.get(RssClientConf.RSS_CLIENT_REPORT_INCLUDE_PROPERTIES);
    rssConf.getAll().stream()
        .filter(entry -> includeProperties == null || includeProperties.contains(entry.getKey()))
        .filter(entry -> !excludeProperties.contains(entry.getKey()))
        .forEach(entry -> extraProperties.put(entry.getKey(), (String) entry.getValue()));

    Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
    try {
      if (coordinatorClient != null) {
        RssAccessClusterResponse response =
            coordinatorClient.accessCluster(
                new RssAccessClusterRequest(
                    accessId,
                    assignmentTags,
                    accessTimeoutMs,
                    extraProperties,
                    user,
                    retryInterval,
                    retryTimes));
        if (response.getStatusCode() == StatusCode.SUCCESS) {
          LOG.warn("Success to access cluster using {}", accessId);
          uuid = response.getUuid();
          return true;
        } else if (response.getStatusCode() == StatusCode.ACCESS_DENIED) {
          throw new RssException(
              "Request to access cluster is denied using "
                  + accessId
                  + " for "
                  + response.getMessage());
        } else {
          throw new RssException("Fail to reach cluster for " + response.getMessage());
        }
      }
    } catch (Throwable e) {
      LOG.warn("Fail to access cluster using {} for ", accessId, e);
    }

    return false;
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java [119:176]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private boolean tryAccessCluster(CoordinatorClient coordinatorClient) {
    String accessId = DelegationRssShuffleManagerUtils.acquireAccessId(sparkConf);
    if (accessId == null) {
      LOG.warn("Access id key is null");
      return false;
    }
    long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS);
    int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES);

    int assignmentShuffleNodesNum =
        sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER);
    Map<String, String> extraProperties = Maps.newHashMap();
    extraProperties.put(
        ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, String.valueOf(assignmentShuffleNodesNum));

    RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
    List<String> excludeProperties =
        rssConf.get(RssClientConf.RSS_CLIENT_REPORT_EXCLUDE_PROPERTIES);
    List<String> includeProperties =
        rssConf.get(RssClientConf.RSS_CLIENT_REPORT_INCLUDE_PROPERTIES);
    rssConf.getAll().stream()
        .filter(entry -> includeProperties == null || includeProperties.contains(entry.getKey()))
        .filter(entry -> !excludeProperties.contains(entry.getKey()))
        .forEach(entry -> extraProperties.put(entry.getKey(), (String) entry.getValue()));

    Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
    try {
      if (coordinatorClient != null) {
        RssAccessClusterResponse response =
            coordinatorClient.accessCluster(
                new RssAccessClusterRequest(
                    accessId,
                    assignmentTags,
                    accessTimeoutMs,
                    extraProperties,
                    user,
                    retryInterval,
                    retryTimes));
        if (response.getStatusCode() == StatusCode.SUCCESS) {
          LOG.warn("Success to access cluster using {}", accessId);
          uuid = response.getUuid();
          return true;
        } else if (response.getStatusCode() == StatusCode.ACCESS_DENIED) {
          throw new RssException(
              "Request to access cluster is denied using "
                  + accessId
                  + " for "
                  + response.getMessage());
        } else {
          throw new RssException("Fail to reach cluster for " + response.getMessage());
        }
      }
    } catch (Throwable e) {
      LOG.warn("Fail to access cluster using {} for ", accessId, e);
    }

    return false;
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



