in client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java [1001:1122]
public MutableShuffleHandleInfo reassignOnBlockSendFailure(
int stageId,
int stageAttemptNumber,
int shuffleId,
Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers,
boolean partitionSplit) {
long startTime = System.currentTimeMillis();
ShuffleHandleInfo handleInfo = shuffleHandleInfoManager.get(shuffleId);
MutableShuffleHandleInfo internalHandle = null;
if (handleInfo instanceof MutableShuffleHandleInfo) {
internalHandle = (MutableShuffleHandleInfo) handleInfo;
} else if (handleInfo instanceof StageAttemptShuffleHandleInfo) {
internalHandle =
(MutableShuffleHandleInfo) ((StageAttemptShuffleHandleInfo) handleInfo).getCurrent();
}
if (internalHandle == null) {
throw new RssException(
"An unexpected error occurred: internalHandle is null, which should not happen");
}
synchronized (internalHandle) {
// If the reassignment servers for one partition exceeds the max reassign server num,
// it should fast fail.
if (!partitionSplit) {
// Do not check the partition reassign server num for partition split case
internalHandle.checkPartitionReassignServerNum(
partitionToFailureServers.keySet(), partitionReassignMaxServerNum);
}
Map<ShuffleServerInfo, List<PartitionRange>> newServerToPartitions = new HashMap<>();
// receivingFailureServer -> partitionId -> replacementServerIds. For logging
Map<String, Map<Integer, Set<String>>> reassignResult = new HashMap<>();
for (Map.Entry<Integer, List<ReceivingFailureServer>> entry :
partitionToFailureServers.entrySet()) {
int partitionId = entry.getKey();
for (ReceivingFailureServer receivingFailureServer : entry.getValue()) {
StatusCode code = receivingFailureServer.getStatusCode();
String serverId = receivingFailureServer.getServerId();
boolean serverHasReplaced = false;
Set<ShuffleServerInfo> updatedReassignServers;
if (!partitionSplit) {
Set<ShuffleServerInfo> replacements = internalHandle.getReplacements(serverId);
if (CollectionUtils.isEmpty(replacements)) {
replacements =
requestReassignServer(
stageId,
stageAttemptNumber,
shuffleId,
internalHandle,
partitionId,
serverId);
} else {
serverHasReplaced = true;
}
updatedReassignServers =
internalHandle.updateAssignment(partitionId, serverId, replacements);
} else {
// todo: must ensure in the load_balance mode, the same partition will not trigger multi
// reassignments.
int requireServerNum = 1;
if (partitionSplitMode == PartitionSplitMode.LOAD_BALANCE) {
requireServerNum = partitionSplitLoadBalanceServerNum;
}
Set<ShuffleServerInfo> replacements =
internalHandle.getReplacementsForPartition(partitionId, serverId);
if (CollectionUtils.isEmpty(replacements)) {
replacements =
requestReassignServer(
stageId,
stageAttemptNumber,
shuffleId,
internalHandle,
partitionId,
serverId,
requireServerNum);
} else {
serverHasReplaced = true;
}
updatedReassignServers =
internalHandle.updateAssignmentOnPartitionSplit(
partitionId, serverId, replacements);
}
if (!updatedReassignServers.isEmpty()) {
reassignResult
.computeIfAbsent(serverId, x -> new HashMap<>())
.computeIfAbsent(partitionId, x -> new HashSet<>())
.addAll(
updatedReassignServers.stream()
.map(x -> x.getId())
.collect(Collectors.toSet()));
if (serverHasReplaced) {
for (ShuffleServerInfo serverInfo : updatedReassignServers) {
newServerToPartitions
.computeIfAbsent(serverInfo, x -> new ArrayList<>())
.add(new PartitionRange(partitionId, partitionId));
}
}
}
}
}
if (!newServerToPartitions.isEmpty()) {
LOG.info(
"Register the new partition->servers assignment on reassign. {}",
newServerToPartitions);
registerShuffleServers(
getAppId(), shuffleId, newServerToPartitions, getRemoteStorageInfo());
}
LOG.info(
"Finished reassignOnBlockSendFailure request and cost {}(ms). is partition split:{}. Reassign result: {}",
System.currentTimeMillis() - startTime,
partitionSplit,
reassignResult);
return internalHandle;
}
}