in client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java [231:341]
private ShuffleAssignmentsInfo getShuffleWorks(
int partitionNum,
int shuffleId,
String keyClassName,
String valueClassName,
String comparatorClassName) {
ShuffleAssignmentsInfo shuffleAssignmentsInfo;
int requiredAssignmentShuffleServersNum =
RssTezUtils.getRequiredShuffleServerNumber(conf, 200, partitionNum);
// retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the
// same result
long retryInterval =
conf.getLong(
RssTezConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL,
RssTezConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE);
int retryTimes =
conf.getInt(
RssTezConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES,
RssTezConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE);
// Get the configured server assignment tags and it will also add default shuffle version tag.
Set<String> assignmentTags = new HashSet<>();
String rawTags = conf.get(RssTezConfig.RSS_CLIENT_ASSIGNMENT_TAGS, "");
if (StringUtils.isNotEmpty(rawTags)) {
rawTags = rawTags.trim();
assignmentTags.addAll(Arrays.asList(rawTags.split(",")));
}
assignmentTags.add(Constants.SHUFFLE_SERVER_VERSION);
String clientType =
conf.get(RssTezConfig.RSS_CLIENT_TYPE, RssTezConfig.RSS_CLIENT_TYPE_DEFAULT_VALUE);
ClientUtils.validateClientType(clientType);
assignmentTags.add(clientType);
try {
shuffleAssignmentsInfo =
RetryUtils.retry(
// When communicate with TezRemoteShuffleUmbilicalProtocol, tez use applicationId
// as ugi name. In security hdfs cluster, if we communicate with shuffle server with
// applicationId ugi, the user of remote storage will be application_xxx_xx
// As we knonw, the max id of hadoop user is 16777215. So we should use execute ugi.
() ->
requestUgi.doAs(
new PrivilegedExceptionAction<ShuffleAssignmentsInfo>() {
@Override
public ShuffleAssignmentsInfo run() throws Exception {
ShuffleAssignmentsInfo shuffleAssignments =
rssClient.getShuffleAssignments(
appId,
shuffleId,
partitionNum,
1,
Sets.newHashSet(assignmentTags),
requiredAssignmentShuffleServersNum,
-1);
Map<ShuffleServerInfo, List<PartitionRange>> serverToPartitionRanges =
shuffleAssignments.getServerToPartitionRanges();
if (serverToPartitionRanges == null
|| serverToPartitionRanges.isEmpty()) {
return null;
}
LOG.info("Start to register shuffle");
long start = System.currentTimeMillis();
serverToPartitionRanges
.entrySet()
.forEach(
entry ->
rssClient.registerShuffle(
entry.getKey(),
appId,
shuffleId,
entry.getValue(),
remoteStorage,
ShuffleDataDistributionType.NORMAL,
RssTezConfig.toRssConf(conf)
.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE),
0,
StringUtils.isBlank(keyClassName)
? null
: RssProtos.MergeContext.newBuilder()
.setKeyClass(keyClassName)
.setValueClass(valueClassName)
.setComparatorClass(comparatorClassName)
.setMergedBlockSize(
conf.getInt(
RssTezConfig.RSS_MERGED_BLOCK_SZIE,
RssTezConfig
.RSS_MERGED_BLOCK_SZIE_DEFAULT))
.setMergeClassLoader(
conf.get(
RssTezConfig
.RSS_REMOTE_MERGE_CLASS_LOADER,
""))
.build()));
LOG.info(
"Finish register shuffle with "
+ (System.currentTimeMillis() - start)
+ " ms");
return shuffleAssignments;
}
}),
retryInterval,
retryTimes);
} catch (Throwable throwable) {
LOG.error("registerShuffle failed!", throwable);
throw new RssException("registerShuffle failed!", throwable);
}
return shuffleAssignmentsInfo;
}