in client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java [73:190]
public <K, V, C> ShuffleHandle registerShuffle(
int shuffleId, int numMaps, ShuffleDependency<K, V, C> dependency) {
// fail fast if number of partitions is not supported by block id layout
if (dependency.partitioner().numPartitions() > blockIdLayout.maxNumPartitions) {
throw new RssException(
"Cannot register shuffle with "
+ dependency.partitioner().numPartitions()
+ " partitions because the configured block id layout supports at most "
+ blockIdLayout.maxNumPartitions
+ " partitions.");
}
// Spark have three kinds of serializer:
// org.apache.spark.serializer.JavaSerializer
// org.apache.spark.sql.execution.UnsafeRowSerializer
// org.apache.spark.serializer.KryoSerializer,
// Only org.apache.spark.serializer.JavaSerializer don't support RelocationOfSerializedObjects.
// So when we find the parameters to use org.apache.spark.serializer.JavaSerializer, We should
// throw an exception
if (!SparkEnv.get().serializer().supportsRelocationOfSerializedObjects()) {
throw new IllegalArgumentException(
"Can't use serialized shuffle for shuffleId: "
+ shuffleId
+ ", because the"
+ " serializer: "
+ SparkEnv.get().serializer().getClass().getName()
+ " does not support object "
+ "relocation.");
}
// If yarn enable retry ApplicationMaster, appId will be not unique and shuffle data will be
// incorrect,
// appId + uuid can avoid such problem,
// can't get appId in construct because SparkEnv is not created yet,
// appId will be initialized only once in this method which
// will be called many times depend on how many shuffle stage
if ("".equals(appId)) {
appId = SparkEnv.get().conf().getAppId() + "_" + uuid;
dataPusher.setRssAppId(appId);
LOG.info("Generate application id used in rss: " + appId);
}
if (dependency.partitioner().numPartitions() == 0) {
shuffleIdToPartitionNum.putIfAbsent(shuffleId, 0);
shuffleIdToNumMapTasks.computeIfAbsent(
shuffleId, key -> dependency.rdd().partitions().length);
LOG.info(
"RegisterShuffle with ShuffleId["
+ shuffleId
+ "], partitionNum is 0, "
+ "return the empty RssShuffleHandle directly");
Broadcast<SimpleShuffleHandleInfo> hdlInfoBd =
RssSparkShuffleUtils.broadcastShuffleHdlInfo(
RssSparkShuffleUtils.getActiveSparkContext(),
shuffleId,
Collections.emptyMap(),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE);
return new RssShuffleHandle<>(
shuffleId, appId, dependency.rdd().getNumPartitions(), dependency, hdlInfoBd);
}
String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
RemoteStorageInfo defaultRemoteStorage = getDefaultRemoteStorageInfo(sparkConf);
RemoteStorageInfo remoteStorage =
ClientUtils.fetchRemoteStorage(
appId, defaultRemoteStorage, dynamicConfEnabled, storageType, shuffleWriteClient);
// get all register info according to coordinator's response
Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
ClientUtils.validateClientType(clientType);
assignmentTags.add(clientType);
int requiredShuffleServerNumber =
RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);
int estimateTaskConcurrency = RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
Map<Integer, List<ShuffleServerInfo>> partitionToServers =
requestShuffleAssignment(
shuffleId,
dependency.partitioner().numPartitions(),
1,
requiredShuffleServerNumber,
estimateTaskConcurrency,
rssStageResubmitManager.getServerIdBlackList(),
0);
startHeartbeat();
shuffleIdToPartitionNum.computeIfAbsent(
shuffleId, key -> dependency.partitioner().numPartitions());
shuffleIdToNumMapTasks.computeIfAbsent(shuffleId, key -> dependency.rdd().partitions().length);
if (shuffleManagerRpcServiceEnabled && rssStageRetryForWriteFailureEnabled) {
ShuffleHandleInfo handleInfo =
new MutableShuffleHandleInfo(shuffleId, partitionToServers, remoteStorage);
StageAttemptShuffleHandleInfo stageAttemptShuffleHandleInfo =
new StageAttemptShuffleHandleInfo(shuffleId, remoteStorage, handleInfo);
shuffleHandleInfoManager.register(shuffleId, stageAttemptShuffleHandleInfo);
} else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
ShuffleHandleInfo shuffleHandleInfo =
new MutableShuffleHandleInfo(shuffleId, partitionToServers, remoteStorage);
shuffleHandleInfoManager.register(shuffleId, shuffleHandleInfo);
}
Broadcast<SimpleShuffleHandleInfo> hdlInfoBd =
RssSparkShuffleUtils.broadcastShuffleHdlInfo(
RssSparkShuffleUtils.getActiveSparkContext(),
shuffleId,
partitionToServers,
remoteStorage);
LOG.info(
"RegisterShuffle with ShuffleId["
+ shuffleId
+ "], partitionNum["
+ partitionToServers.size()
+ "], server:{}",
partitionToServers);
return new RssShuffleHandle(shuffleId, appId, numMaps, dependency, hdlInfoBd);
}