in client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java [171:257]
private ShuffleAssignmentsInfo getShuffleWorks(int partitionNum, int shuffleId) {
ShuffleAssignmentsInfo shuffleAssignmentsInfo;
int requiredAssignmentShuffleServersNum =
RssTezUtils.getRequiredShuffleServerNumber(conf, 200, partitionNum);
// retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the
// same result
long retryInterval =
conf.getLong(
RssTezConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL,
RssTezConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE);
int retryTimes =
conf.getInt(
RssTezConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES,
RssTezConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE);
// Get the configured server assignment tags and it will also add default shuffle version tag.
Set<String> assignmentTags = new HashSet<>();
String rawTags = conf.get(RssTezConfig.RSS_CLIENT_ASSIGNMENT_TAGS, "");
if (StringUtils.isNotEmpty(rawTags)) {
rawTags = rawTags.trim();
assignmentTags.addAll(Arrays.asList(rawTags.split(",")));
}
assignmentTags.add(Constants.SHUFFLE_SERVER_VERSION);
// get remote storage from coordinator if necessary
boolean dynamicConfEnabled =
conf.getBoolean(
RssTezConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED,
RssTezConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE);
RemoteStorageInfo defaultRemoteStorage =
new RemoteStorageInfo(conf.get(RssTezConfig.RSS_REMOTE_STORAGE_PATH, ""));
String storageType =
conf.get(RssTezConfig.RSS_STORAGE_TYPE, RssTezConfig.RSS_STORAGE_TYPE_DEFAULT_VALUE);
boolean testMode = conf.getBoolean(RssTezConfig.RSS_TEST_MODE_ENABLE, false);
ClientUtils.validateTestModeConf(testMode, storageType);
RemoteStorageInfo remoteStorage =
ClientUtils.fetchRemoteStorage(
appId, defaultRemoteStorage, dynamicConfEnabled, storageType, rssClient);
try {
shuffleAssignmentsInfo =
RetryUtils.retry(
() -> {
ShuffleAssignmentsInfo shuffleAssignments =
rssClient.getShuffleAssignments(
appId,
shuffleId,
partitionNum,
1,
Sets.newHashSet(assignmentTags),
requiredAssignmentShuffleServersNum,
-1);
Map<ShuffleServerInfo, List<PartitionRange>> serverToPartitionRanges =
shuffleAssignments.getServerToPartitionRanges();
if (serverToPartitionRanges == null || serverToPartitionRanges.isEmpty()) {
return null;
}
LOG.info("Start to register shuffle");
long start = System.currentTimeMillis();
serverToPartitionRanges
.entrySet()
.forEach(
entry ->
rssClient.registerShuffle(
entry.getKey(),
appId,
shuffleId,
entry.getValue(),
remoteStorage,
ShuffleDataDistributionType.NORMAL,
RssTezConfig.toRssConf(conf)
.get(MAX_CONCURRENCY_PER_PARTITION_TO_WRITE)));
LOG.info(
"Finish register shuffle with " + (System.currentTimeMillis() - start) + " ms");
return shuffleAssignments;
},
retryInterval,
retryTimes);
} catch (Throwable throwable) {
LOG.error("registerShuffle failed!", throwable);
throw new RssException("registerShuffle failed!", throwable);
}
return shuffleAssignmentsInfo;
}