private ShuffleAssignmentsInfo getShuffleWorks()

in client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java [171:257]


  private ShuffleAssignmentsInfo getShuffleWorks(int partitionNum, int shuffleId) {
    ShuffleAssignmentsInfo shuffleAssignmentsInfo;
    int requiredAssignmentShuffleServersNum =
        RssTezUtils.getRequiredShuffleServerNumber(conf, 200, partitionNum);
    // retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the
    // same result
    long retryInterval =
        conf.getLong(
            RssTezConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL,
            RssTezConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE);
    int retryTimes =
        conf.getInt(
            RssTezConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES,
            RssTezConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE);

    // Get the configured server assignment tags and it will also add default shuffle version tag.
    Set<String> assignmentTags = new HashSet<>();
    String rawTags = conf.get(RssTezConfig.RSS_CLIENT_ASSIGNMENT_TAGS, "");
    if (StringUtils.isNotEmpty(rawTags)) {
      rawTags = rawTags.trim();
      assignmentTags.addAll(Arrays.asList(rawTags.split(",")));
    }
    assignmentTags.add(Constants.SHUFFLE_SERVER_VERSION);

    // get remote storage from coordinator if necessary
    boolean dynamicConfEnabled =
        conf.getBoolean(
            RssTezConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED,
            RssTezConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE);
    RemoteStorageInfo defaultRemoteStorage =
        new RemoteStorageInfo(conf.get(RssTezConfig.RSS_REMOTE_STORAGE_PATH, ""));
    String storageType =
        conf.get(RssTezConfig.RSS_STORAGE_TYPE, RssTezConfig.RSS_STORAGE_TYPE_DEFAULT_VALUE);
    boolean testMode = conf.getBoolean(RssTezConfig.RSS_TEST_MODE_ENABLE, false);
    ClientUtils.validateTestModeConf(testMode, storageType);
    RemoteStorageInfo remoteStorage =
        ClientUtils.fetchRemoteStorage(
            appId, defaultRemoteStorage, dynamicConfEnabled, storageType, rssClient);

    try {
      shuffleAssignmentsInfo =
          RetryUtils.retry(
              () -> {
                ShuffleAssignmentsInfo shuffleAssignments =
                    rssClient.getShuffleAssignments(
                        appId,
                        shuffleId,
                        partitionNum,
                        1,
                        Sets.newHashSet(assignmentTags),
                        requiredAssignmentShuffleServersNum,
                        -1);

                Map<ShuffleServerInfo, List<PartitionRange>> serverToPartitionRanges =
                    shuffleAssignments.getServerToPartitionRanges();

                if (serverToPartitionRanges == null || serverToPartitionRanges.isEmpty()) {
                  return null;
                }
                LOG.info("Start to register shuffle");
                long start = System.currentTimeMillis();
                serverToPartitionRanges
                    .entrySet()
                    .forEach(
                        entry ->
                            rssClient.registerShuffle(
                                entry.getKey(),
                                appId,
                                shuffleId,
                                entry.getValue(),
                                remoteStorage,
                                ShuffleDataDistributionType.NORMAL,
                                RssTezConfig.toRssConf(conf)
                                    .get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE)));
                LOG.info(
                    "Finish register shuffle with " + (System.currentTimeMillis() - start) + " ms");
                return shuffleAssignments;
              },
              retryInterval,
              retryTimes);
    } catch (Throwable throwable) {
      LOG.error("registerShuffle failed!", throwable);
      throw new RssException("registerShuffle failed!", throwable);
    }

    return shuffleAssignmentsInfo;
  }