public SendShuffleDataResult sendShuffleData()

in client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java [310:429]


  public SendShuffleDataResult sendShuffleData(
      String appId,
      List<ShuffleBlockInfo> shuffleBlockInfoList,
      Supplier<Boolean> needCancelRequest) {

    // shuffleServer -> shuffleId -> partitionId -> blocks
    Map<ShuffleServerInfo, Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>>
        primaryServerToBlocks = Maps.newHashMap();
    Map<ShuffleServerInfo, Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>>
        secondaryServerToBlocks = Maps.newHashMap();
    Map<ShuffleServerInfo, List<Long>> primaryServerToBlockIds = Maps.newHashMap();
    Map<ShuffleServerInfo, List<Long>> secondaryServerToBlockIds = Maps.newHashMap();

    // send shuffle block to shuffle server
    // for all ShuffleBlockInfo, create the data structure as shuffleServer -> shuffleId ->
    // partitionId -> blocks
    // it will be helpful to send rpc request to shuffleServer

    // In order to reduce the data to send in quorum protocol,
    // we split these blocks into two rounds: primary and secondary.
    // The primary round contains [0, replicaWrite) replicas,
    // which is minimum number when there is no sending server failures.
    // The secondary round contains [replicaWrite, replica) replicas,
    // which is minimum number when there is at most *replicaWrite - replica* sending server
    // failures.
    for (ShuffleBlockInfo sbi : shuffleBlockInfoList) {
      List<ShuffleServerInfo> allServers = sbi.getShuffleServerInfos();
      if (replicaSkipEnabled) {
        Set<ShuffleServerInfo> excludeServers = Sets.newHashSet();
        genServerToBlocks(
            sbi,
            allServers,
            replicaWrite,
            excludeServers,
            primaryServerToBlocks,
            primaryServerToBlockIds,
            true);
        genServerToBlocks(
            sbi,
            allServers,
            replica - replicaWrite,
            excludeServers,
            secondaryServerToBlocks,
            secondaryServerToBlockIds,
            false);
      } else {
        // When replicaSkip is disabled, we send data to all replicas within one round.
        genServerToBlocks(
            sbi,
            allServers,
            allServers.size(),
            null,
            primaryServerToBlocks,
            primaryServerToBlockIds,
            false);
      }
    }

    // maintain the count of blocks that have been sent to the server
    // unnecessary to use concurrent hashmap here unless you need to insert or delete entries in
    // other threads
    // AtomicInteger is enough to reflect value changes in other threads
    Map<Long, AtomicInteger> blockIdsTracker = Maps.newHashMap();
    primaryServerToBlockIds
        .values()
        .forEach(
            blockList ->
                blockList.forEach(block -> blockIdsTracker.put(block, new AtomicInteger(0))));
    secondaryServerToBlockIds
        .values()
        .forEach(
            blockList ->
                blockList.forEach(block -> blockIdsTracker.put(block, new AtomicInteger(0))));

    Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
    Set<Long> successBlockIds = Sets.newConcurrentHashSet();
    // if send block failed, the task will fail
    // todo: better to have fallback solution when send to multiple servers

    // sent the primary round of blocks.
    boolean isAllSuccess =
        sendShuffleDataAsync(
            appId,
            primaryServerToBlocks,
            primaryServerToBlockIds,
            blockIdsTracker,
            secondaryServerToBlocks.isEmpty(),
            needCancelRequest);

    // The secondary round of blocks is sent only when the primary group issues failed sending.
    // This should be infrequent.
    // Even though the secondary round may send blocks more than replicaWrite replicas,
    // we do not apply complicated skipping logic, because server crash is rare in production
    // environment.
    if (!isAllSuccess && !secondaryServerToBlocks.isEmpty() && !needCancelRequest.get()) {
      LOG.info("The sending of primary round is failed partially, so start the secondary round");
      sendShuffleDataAsync(
          appId,
          secondaryServerToBlocks,
          secondaryServerToBlockIds,
          blockIdsTracker,
          true,
          needCancelRequest);
    }

    // check success and failed blocks according to the replicaWrite
    blockIdsTracker
        .entrySet()
        .forEach(
            blockCt -> {
              long blockId = blockCt.getKey();
              int count = blockCt.getValue().get();
              if (count >= replicaWrite) {
                successBlockIds.add(blockId);
              } else {
                failedBlockIds.add(blockId);
              }
            });
    return new SendShuffleDataResult(successBlockIds, failedBlockIds);
  }