in client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java [659:744]
public void reportShuffleResult(
Map<Integer, List<ShuffleServerInfo>> partitionToServers,
String appId,
int shuffleId,
long taskAttemptId,
Map<Integer, List<Long>> partitionToBlockIds,
int bitmapNum) {
Map<ShuffleServerInfo, List<Integer>> groupedPartitions = Maps.newHashMap();
Map<Integer, Integer> partitionReportTracker = Maps.newHashMap();
for (Map.Entry<Integer, List<ShuffleServerInfo>> entry : partitionToServers.entrySet()) {
int partitionIdx = entry.getKey();
for (ShuffleServerInfo ssi : entry.getValue()) {
if (!groupedPartitions.containsKey(ssi)) {
groupedPartitions.put(ssi, Lists.newArrayList());
}
groupedPartitions.get(ssi).add(partitionIdx);
}
if (CollectionUtils.isNotEmpty(partitionToBlockIds.get(partitionIdx))) {
partitionReportTracker.putIfAbsent(partitionIdx, 0);
}
}
for (Map.Entry<ShuffleServerInfo, List<Integer>> entry : groupedPartitions.entrySet()) {
Map<Integer, List<Long>> requestBlockIds = Maps.newHashMap();
for (Integer partitionId : entry.getValue()) {
List<Long> blockIds = partitionToBlockIds.get(partitionId);
if (CollectionUtils.isNotEmpty(blockIds)) {
requestBlockIds.put(partitionId, blockIds);
}
}
if (requestBlockIds.isEmpty()) {
continue;
}
RssReportShuffleResultRequest request =
new RssReportShuffleResultRequest(
appId, shuffleId, taskAttemptId, requestBlockIds, bitmapNum);
ShuffleServerInfo ssi = entry.getKey();
try {
RssReportShuffleResultResponse response =
getShuffleServerClient(ssi).reportShuffleResult(request);
if (response.getStatusCode() == StatusCode.SUCCESS) {
LOG.info(
"Report shuffle result to "
+ ssi
+ " for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "] successfully");
for (Integer partitionId : requestBlockIds.keySet()) {
partitionReportTracker.put(partitionId, partitionReportTracker.get(partitionId) + 1);
}
} else {
LOG.warn(
"Report shuffle result to "
+ ssi
+ " for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "] failed with "
+ response.getStatusCode());
}
} catch (Exception e) {
LOG.warn(
"Report shuffle result is failed to "
+ ssi
+ " for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "]");
}
}
// quorum check
for (Map.Entry<Integer, Integer> entry : partitionReportTracker.entrySet()) {
if (entry.getValue() < replicaWrite) {
throw new RssException(
"Quorum check of report shuffle result is failed for appId["
+ appId
+ "], shuffleId["
+ shuffleId
+ "]");
}
}
}