in client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java [330:436]
public <K, V, C> ShuffleHandle registerShuffle(
int shuffleId, ShuffleDependency<K, V, C> dependency) {
// Spark have three kinds of serializer:
// org.apache.spark.serializer.JavaSerializer
// org.apache.spark.sql.execution.UnsafeRowSerializer
// org.apache.spark.serializer.KryoSerializer,
// Only org.apache.spark.serializer.JavaSerializer don't support RelocationOfSerializedObjects.
// So when we find the parameters to use org.apache.spark.serializer.JavaSerializer, We should
// throw an exception
if (!SparkEnv.get().serializer().supportsRelocationOfSerializedObjects()) {
throw new IllegalArgumentException(
"Can't use serialized shuffle for shuffleId: "
+ shuffleId
+ ", because the"
+ " serializer: "
+ SparkEnv.get().serializer().getClass().getName()
+ " does not support object "
+ "relocation.");
}
if (id.get() == null) {
id.compareAndSet(null, SparkEnv.get().conf().getAppId() + "_" + uuid);
dataPusher.setRssAppId(id.get());
}
LOG.info("Generate application id used in rss: " + id.get());
if (dependency.partitioner().numPartitions() == 0) {
shuffleIdToPartitionNum.putIfAbsent(shuffleId, 0);
shuffleIdToNumMapTasks.putIfAbsent(shuffleId, dependency.rdd().partitions().length);
LOG.info(
"RegisterShuffle with ShuffleId["
+ shuffleId
+ "], partitionNum is 0, "
+ "return the empty RssShuffleHandle directly");
Broadcast<ShuffleHandleInfo> hdlInfoBd =
RssSparkShuffleUtils.broadcastShuffleHdlInfo(
RssSparkShuffleUtils.getActiveSparkContext(),
shuffleId,
Collections.emptyMap(),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE);
return new RssShuffleHandle<>(
shuffleId, id.get(), dependency.rdd().getNumPartitions(), dependency, hdlInfoBd);
}
String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
RemoteStorageInfo defaultRemoteStorage =
new RemoteStorageInfo(sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), ""));
RemoteStorageInfo remoteStorage =
ClientUtils.fetchRemoteStorage(
id.get(), defaultRemoteStorage, dynamicConfEnabled, storageType, shuffleWriteClient);
Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
ClientUtils.validateClientType(clientType);
assignmentTags.add(clientType);
int requiredShuffleServerNumber =
RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);
// retryInterval must bigger than `rss.server.heartbeat.interval`, or maybe it will return the
// same result
long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL);
int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES);
int estimateTaskConcurrency = RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
Map<Integer, List<ShuffleServerInfo>> partitionToServers;
try {
partitionToServers =
RetryUtils.retry(
() -> {
ShuffleAssignmentsInfo response =
shuffleWriteClient.getShuffleAssignments(
id.get(),
shuffleId,
dependency.partitioner().numPartitions(),
1,
assignmentTags,
requiredShuffleServerNumber,
estimateTaskConcurrency);
registerShuffleServers(
id.get(), shuffleId, response.getServerToPartitionRanges(), remoteStorage);
return response.getPartitionToServers();
},
retryInterval,
retryTimes);
} catch (Throwable throwable) {
throw new RssException("registerShuffle failed!", throwable);
}
startHeartbeat();
shuffleIdToPartitionNum.putIfAbsent(shuffleId, dependency.partitioner().numPartitions());
shuffleIdToNumMapTasks.putIfAbsent(shuffleId, dependency.rdd().partitions().length);
Broadcast<ShuffleHandleInfo> hdlInfoBd =
RssSparkShuffleUtils.broadcastShuffleHdlInfo(
RssSparkShuffleUtils.getActiveSparkContext(),
shuffleId,
partitionToServers,
remoteStorage);
LOG.info(
"RegisterShuffle with ShuffleId["
+ shuffleId
+ "], partitionNum["
+ partitionToServers.size()
+ "], shuffleServerForResult: "
+ partitionToServers);
return new RssShuffleHandle<>(
shuffleId, id.get(), dependency.rdd().getNumPartitions(), dependency, hdlInfoBd);
}