in client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java [310:429]
public SendShuffleDataResult sendShuffleData(
String appId,
List<ShuffleBlockInfo> shuffleBlockInfoList,
Supplier<Boolean> needCancelRequest) {
// shuffleServer -> shuffleId -> partitionId -> blocks
Map<ShuffleServerInfo, Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>>
primaryServerToBlocks = Maps.newHashMap();
Map<ShuffleServerInfo, Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>>
secondaryServerToBlocks = Maps.newHashMap();
Map<ShuffleServerInfo, List<Long>> primaryServerToBlockIds = Maps.newHashMap();
Map<ShuffleServerInfo, List<Long>> secondaryServerToBlockIds = Maps.newHashMap();
// send shuffle block to shuffle server
// for all ShuffleBlockInfo, create the data structure as shuffleServer -> shuffleId ->
// partitionId -> blocks
// it will be helpful to send rpc request to shuffleServer
// In order to reduce the data to send in quorum protocol,
// we split these blocks into two rounds: primary and secondary.
// The primary round contains [0, replicaWrite) replicas,
// which is minimum number when there is no sending server failures.
// The secondary round contains [replicaWrite, replica) replicas,
// which is minimum number when there is at most *replicaWrite - replica* sending server
// failures.
for (ShuffleBlockInfo sbi : shuffleBlockInfoList) {
List<ShuffleServerInfo> allServers = sbi.getShuffleServerInfos();
if (replicaSkipEnabled) {
Set<ShuffleServerInfo> excludeServers = Sets.newHashSet();
genServerToBlocks(
sbi,
allServers,
replicaWrite,
excludeServers,
primaryServerToBlocks,
primaryServerToBlockIds,
true);
genServerToBlocks(
sbi,
allServers,
replica - replicaWrite,
excludeServers,
secondaryServerToBlocks,
secondaryServerToBlockIds,
false);
} else {
// When replicaSkip is disabled, we send data to all replicas within one round.
genServerToBlocks(
sbi,
allServers,
allServers.size(),
null,
primaryServerToBlocks,
primaryServerToBlockIds,
false);
}
}
// maintain the count of blocks that have been sent to the server
// unnecessary to use concurrent hashmap here unless you need to insert or delete entries in
// other threads
// AtomicInteger is enough to reflect value changes in other threads
Map<Long, AtomicInteger> blockIdsTracker = Maps.newHashMap();
primaryServerToBlockIds
.values()
.forEach(
blockList ->
blockList.forEach(block -> blockIdsTracker.put(block, new AtomicInteger(0))));
secondaryServerToBlockIds
.values()
.forEach(
blockList ->
blockList.forEach(block -> blockIdsTracker.put(block, new AtomicInteger(0))));
Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
Set<Long> successBlockIds = Sets.newConcurrentHashSet();
// if send block failed, the task will fail
// todo: better to have fallback solution when send to multiple servers
// sent the primary round of blocks.
boolean isAllSuccess =
sendShuffleDataAsync(
appId,
primaryServerToBlocks,
primaryServerToBlockIds,
blockIdsTracker,
secondaryServerToBlocks.isEmpty(),
needCancelRequest);
// The secondary round of blocks is sent only when the primary group issues failed sending.
// This should be infrequent.
// Even though the secondary round may send blocks more than replicaWrite replicas,
// we do not apply complicated skipping logic, because server crash is rare in production
// environment.
if (!isAllSuccess && !secondaryServerToBlocks.isEmpty() && !needCancelRequest.get()) {
LOG.info("The sending of primary round is failed partially, so start the secondary round");
sendShuffleDataAsync(
appId,
secondaryServerToBlocks,
secondaryServerToBlockIds,
blockIdsTracker,
true,
needCancelRequest);
}
// check success and failed blocks according to the replicaWrite
blockIdsTracker
.entrySet()
.forEach(
blockCt -> {
long blockId = blockCt.getKey();
int count = blockCt.getValue().get();
if (count >= replicaWrite) {
successBlockIds.add(blockId);
} else {
failedBlockIds.add(blockId);
}
});
return new SendShuffleDataResult(successBlockIds, failedBlockIds);
}