in client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java [357:487]
public SendShuffleDataResult sendShuffleData(
String appId,
int stageAttemptNumber,
List<ShuffleBlockInfo> shuffleBlockInfoList,
Supplier<Boolean> needCancelRequest) {
// shuffleServer -> shuffleId -> partitionId -> blocks
Map<ShuffleServerInfo, Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>>
primaryServerToBlocks = Maps.newHashMap();
Map<ShuffleServerInfo, Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>>
secondaryServerToBlocks = Maps.newHashMap();
Map<ShuffleServerInfo, List<Long>> primaryServerToBlockIds = Maps.newHashMap();
Map<ShuffleServerInfo, List<Long>> secondaryServerToBlockIds = Maps.newHashMap();
// send shuffle block to shuffle server
// for all ShuffleBlockInfo, create the data structure as shuffleServer -> shuffleId ->
// partitionId -> blocks
// it will be helpful to send rpc request to shuffleServer
// In order to reduce the data to send in quorum protocol,
// we split these blocks into two rounds: primary and secondary.
// The primary round contains [0, replicaWrite) replicas,
// which is minimum number when there is no sending server failures.
// The secondary round contains [replicaWrite, replica) replicas,
// which is minimum number when there is at most *replicaWrite - replica* sending server
// failures.
for (ShuffleBlockInfo sbi : shuffleBlockInfoList) {
List<ShuffleServerInfo> allServers = sbi.getShuffleServerInfos();
if (replicaSkipEnabled) {
Set<ShuffleServerInfo> excludeServers = Sets.newHashSet();
genServerToBlocks(
sbi,
allServers,
replicaWrite,
excludeServers,
primaryServerToBlocks,
primaryServerToBlockIds,
true);
genServerToBlocks(
sbi,
allServers,
replica - replicaWrite,
excludeServers,
secondaryServerToBlocks,
secondaryServerToBlockIds,
false);
} else {
// When replicaSkip is disabled, we send data to all replicas within one round.
genServerToBlocks(
sbi,
allServers,
allServers.size(),
null,
primaryServerToBlocks,
primaryServerToBlockIds,
false);
}
}
/** Records the ShuffleServer that successfully or failed to send blocks */
// we assume that most of the blocks can be sent successfully
// so initialize the map at first without concurrency insurance
// AtomicInteger is enough to reflect value changes in other threads
Map<Long, AtomicInteger> blockIdsSendSuccessTracker = Maps.newHashMap();
primaryServerToBlockIds
.values()
.forEach(
blockList ->
blockList.forEach(
block ->
blockIdsSendSuccessTracker.computeIfAbsent(
block, id -> new AtomicInteger(0))));
secondaryServerToBlockIds
.values()
.forEach(
blockList ->
blockList.forEach(
block ->
blockIdsSendSuccessTracker.computeIfAbsent(
block, id -> new AtomicInteger(0))));
FailedBlockSendTracker blockIdsSendFailTracker = new FailedBlockSendTracker();
ShuffleServerPushCostTracker shuffleServerPushCostTracker = new ShuffleServerPushCostTracker();
// sent the primary round of blocks.
boolean isAllSuccess =
sendShuffleDataAsync(
appId,
stageAttemptNumber,
primaryServerToBlocks,
primaryServerToBlockIds,
blockIdsSendSuccessTracker,
blockIdsSendFailTracker,
secondaryServerToBlocks.isEmpty(),
needCancelRequest,
shuffleServerPushCostTracker);
// The secondary round of blocks is sent only when the primary group issues failed sending.
// This should be infrequent.
// Even though the secondary round may send blocks more than replicaWrite replicas,
// we do not apply complicated skipping logic, because server crash is rare in production
// environment.
if (!isAllSuccess && !secondaryServerToBlocks.isEmpty() && !needCancelRequest.get()) {
LOG.info("The sending of primary round is failed partially, so start the secondary round");
sendShuffleDataAsync(
appId,
stageAttemptNumber,
secondaryServerToBlocks,
secondaryServerToBlockIds,
blockIdsSendSuccessTracker,
blockIdsSendFailTracker,
true,
needCancelRequest,
shuffleServerPushCostTracker);
}
Set<Long> blockIdsSendSuccessSet = Sets.newHashSet();
blockIdsSendSuccessTracker
.entrySet()
.forEach(
successBlockId -> {
if (successBlockId.getValue().get() >= replicaWrite) {
blockIdsSendSuccessSet.add(successBlockId.getKey());
// If the replicaWrite to be sent is reached,
// no matter whether the block fails to be sent or not,
// the block is considered to have been sent successfully and is removed from the
// failed block tracker
blockIdsSendFailTracker.remove(successBlockId.getKey());
}
});
return new SendShuffleDataResult(
blockIdsSendSuccessSet, blockIdsSendFailTracker, shuffleServerPushCostTracker);
}