private ShuffleAssignmentsInfo getShuffleWorks()

in client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java [231:341]


  private ShuffleAssignmentsInfo getShuffleWorks(
      int partitionNum,
      int shuffleId,
      String keyClassName,
      String valueClassName,
      String comparatorClassName) {
    ShuffleAssignmentsInfo shuffleAssignmentsInfo;
    int requiredAssignmentShuffleServersNum =
        RssTezUtils.getRequiredShuffleServerNumber(conf, 200, partitionNum);
    // retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the
    // same result
    long retryInterval =
        conf.getLong(
            RssTezConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL,
            RssTezConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE);
    int retryTimes =
        conf.getInt(
            RssTezConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES,
            RssTezConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE);

    // Get the configured server assignment tags and it will also add default shuffle version tag.
    Set<String> assignmentTags = new HashSet<>();
    String rawTags = conf.get(RssTezConfig.RSS_CLIENT_ASSIGNMENT_TAGS, "");
    if (StringUtils.isNotEmpty(rawTags)) {
      rawTags = rawTags.trim();
      assignmentTags.addAll(Arrays.asList(rawTags.split(",")));
    }
    assignmentTags.add(Constants.SHUFFLE_SERVER_VERSION);
    String clientType =
        conf.get(RssTezConfig.RSS_CLIENT_TYPE, RssTezConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE);
    ClientUtils.validateClientType(clientType);
    assignmentTags.add(clientType);

    try {
      shuffleAssignmentsInfo =
          RetryUtils.retry(
              // When communicate with TezRemoteShuffleUmbilicalProtocol, tez use applicationId
              // as ugi name. In security hdfs cluster, if we communicate with shuffle server with
              // applicationId ugi, the user of remote storage will be application_xxx_xx
              // As we knonw, the max id of hadoop user is 16777215. So we should use execute ugi.
              () ->
                  requestUgi.doAs(
                      new PrivilegedExceptionAction<ShuffleAssignmentsInfo>() {
                        @Override
                        public ShuffleAssignmentsInfo run() throws Exception {
                          ShuffleAssignmentsInfo shuffleAssignments =
                              rssClient.getShuffleAssignments(
                                  appId,
                                  shuffleId,
                                  partitionNum,
                                  1,
                                  Sets.newHashSet(assignmentTags),
                                  requiredAssignmentShuffleServersNum,
                                  -1);

                          Map<ShuffleServerInfo, List<PartitionRange>> serverToPartitionRanges =
                              shuffleAssignments.getServerToPartitionRanges();

                          if (serverToPartitionRanges == null
                              || serverToPartitionRanges.isEmpty()) {
                            return null;
                          }
                          LOG.info("Start to register shuffle");
                          long start = System.currentTimeMillis();
                          serverToPartitionRanges
                              .entrySet()
                              .forEach(
                                  entry ->
                                      rssClient.registerShuffle(
                                          entry.getKey(),
                                          appId,
                                          shuffleId,
                                          entry.getValue(),
                                          remoteStorage,
                                          ShuffleDataDistributionType.NORMAL,
                                          RssTezConfig.toRssConf(conf)
                                              .get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE),
                                          0,
                                          StringUtils.isBlank(keyClassName)
                                              ? null
                                              : RssProtos.MergeContext.newBuilder()
                                                  .setKeyClass(keyClassName)
                                                  .setValueClass(valueClassName)
                                                  .setComparatorClass(comparatorClassName)
                                                  .setMergedBlockSize(
                                                      conf.getInt(
                                                          RssTezConfig.RSS_MERGED_BLOCK_SZIE,
                                                          RssTezConfig
                                                              .RSS_MERGED_BLOCK_SZIE_DEFAULT))
                                                  .setMergeClassLoader(
                                                      conf.get(
                                                          RssTezConfig
                                                              .RSS_REMOTE_MERGE_CLASS_LOADER,
                                                          ""))
                                                  .build()));
                          LOG.info(
                              "Finish register shuffle with "
                                  + (System.currentTimeMillis() - start)
                                  + " ms");
                          return shuffleAssignments;
                        }
                      }),
              retryInterval,
              retryTimes);
    } catch (Throwable throwable) {
      LOG.error("registerShuffle failed!", throwable);
      throw new RssException("registerShuffle failed!", throwable);
    }

    return shuffleAssignmentsInfo;
  }