in client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java [100:225]
public <K, V, C> ShuffleHandle registerShuffle(
int shuffleId, ShuffleDependency<K, V, C> dependency) {
// fail fast if number of partitions is not supported by block id layout
if (dependency.partitioner().numPartitions() > blockIdLayout.maxNumPartitions) {
throw new RssException(
"Cannot register shuffle with "
+ dependency.partitioner().numPartitions()
+ " partitions because the configured block id layout supports at most "
+ blockIdLayout.maxNumPartitions
+ " partitions.");
}
// Spark have three kinds of serializer:
// org.apache.spark.serializer.JavaSerializer
// org.apache.spark.sql.execution.UnsafeRowSerializer
// org.apache.spark.serializer.KryoSerializer,
// Only org.apache.spark.serializer.JavaSerializer don't support RelocationOfSerializedObjects.
// So when we find the parameters to use org.apache.spark.serializer.JavaSerializer, We should
// throw an exception
if (!SparkEnv.get().serializer().supportsRelocationOfSerializedObjects()) {
throw new IllegalArgumentException(
"Can't use serialized shuffle for shuffleId: "
+ shuffleId
+ ", because the"
+ " serializer: "
+ SparkEnv.get().serializer().getClass().getName()
+ " does not support object "
+ "relocation.");
}
if (id.get() == null) {
id.compareAndSet(null, SparkEnv.get().conf().getAppId() + "_" + uuid);
appId = id.get();
dataPusher.setRssAppId(id.get());
}
LOG.info("Generate application id used in rss: " + id.get());
if (dependency.partitioner().numPartitions() == 0) {
shuffleIdToPartitionNum.putIfAbsent(shuffleId, 0);
shuffleIdToNumMapTasks.computeIfAbsent(
shuffleId, key -> dependency.rdd().partitions().length);
LOG.info(
"RegisterShuffle with ShuffleId["
+ shuffleId
+ "], partitionNum is 0, "
+ "return the empty RssShuffleHandle directly");
Broadcast<SimpleShuffleHandleInfo> hdlInfoBd =
RssSparkShuffleUtils.broadcastShuffleHdlInfo(
RssSparkShuffleUtils.getActiveSparkContext(),
shuffleId,
Collections.emptyMap(),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE);
return new RssShuffleHandle<>(
shuffleId, id.get(), dependency.rdd().getNumPartitions(), dependency, hdlInfoBd);
}
String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
RemoteStorageInfo defaultRemoteStorage = getDefaultRemoteStorageInfo(sparkConf);
RemoteStorageInfo remoteStorage =
ClientUtils.fetchRemoteStorage(
id.get(), defaultRemoteStorage, dynamicConfEnabled, storageType, shuffleWriteClient);
Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
ClientUtils.validateClientType(clientType);
assignmentTags.add(clientType);
int requiredShuffleServerNumber =
RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);
int estimateTaskConcurrency = RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
Map<Integer, List<ShuffleServerInfo>> partitionToServers =
requestShuffleAssignment(
shuffleId,
dependency.partitioner().numPartitions(),
1,
requiredShuffleServerNumber,
estimateTaskConcurrency,
rssStageResubmitManager.getServerIdBlackList(),
0);
startHeartbeat();
shuffleIdToPartitionNum.computeIfAbsent(
shuffleId, key -> dependency.partitioner().numPartitions());
shuffleIdToNumMapTasks.computeIfAbsent(shuffleId, key -> dependency.rdd().partitions().length);
if (shuffleManagerRpcServiceEnabled && rssStageRetryForWriteFailureEnabled) {
ShuffleHandleInfo shuffleHandleInfo =
new MutableShuffleHandleInfo(
shuffleId, partitionToServers, remoteStorage, partitionSplitMode);
StageAttemptShuffleHandleInfo handleInfo =
new StageAttemptShuffleHandleInfo(shuffleId, remoteStorage, shuffleHandleInfo);
shuffleHandleInfoManager.register(shuffleId, handleInfo);
} else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
ShuffleHandleInfo shuffleHandleInfo =
new MutableShuffleHandleInfo(
shuffleId, partitionToServers, remoteStorage, partitionSplitMode);
shuffleHandleInfoManager.register(shuffleId, shuffleHandleInfo);
}
Broadcast<SimpleShuffleHandleInfo> hdlInfoBd =
RssSparkShuffleUtils.broadcastShuffleHdlInfo(
RssSparkShuffleUtils.getActiveSparkContext(),
shuffleId,
partitionToServers,
remoteStorage);
LOG.info(
"RegisterShuffle with ShuffleId["
+ shuffleId
+ "], partitionNum["
+ partitionToServers.size()
+ "], shuffleServerForResult: "
+ partitionToServers);
// Post assignment event
RssSparkShuffleUtils.getActiveSparkContext()
.listenerBus()
.post(
new ShuffleAssignmentInfoEvent(
shuffleId,
new ArrayList<>(
partitionToServers.values().stream()
.flatMap(x -> x.stream())
.map(x -> x.getId())
.collect(Collectors.toSet()))));
return new RssShuffleHandle<>(
shuffleId, id.get(), dependency.rdd().getNumPartitions(), dependency, hdlInfoBd);
}