public SendShuffleDataResult sendShuffleData()

in client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java [357:487]


  public SendShuffleDataResult sendShuffleData(
      String appId,
      int stageAttemptNumber,
      List<ShuffleBlockInfo> shuffleBlockInfoList,
      Supplier<Boolean> needCancelRequest) {

    // shuffleServer -> shuffleId -> partitionId -> blocks
    Map<ShuffleServerInfo, Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>>
        primaryServerToBlocks = Maps.newHashMap();
    Map<ShuffleServerInfo, Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>>
        secondaryServerToBlocks = Maps.newHashMap();
    Map<ShuffleServerInfo, List<Long>> primaryServerToBlockIds = Maps.newHashMap();
    Map<ShuffleServerInfo, List<Long>> secondaryServerToBlockIds = Maps.newHashMap();

    // send shuffle block to shuffle server
    // for all ShuffleBlockInfo, create the data structure as shuffleServer -> shuffleId ->
    // partitionId -> blocks
    // it will be helpful to send rpc request to shuffleServer

    // In order to reduce the data to send in quorum protocol,
    // we split these blocks into two rounds: primary and secondary.
    // The primary round contains [0, replicaWrite) replicas,
    // which is minimum number when there is no sending server failures.
    // The secondary round contains [replicaWrite, replica) replicas,
    // which is minimum number when there is at most *replicaWrite - replica* sending server
    // failures.
    for (ShuffleBlockInfo sbi : shuffleBlockInfoList) {
      List<ShuffleServerInfo> allServers = sbi.getShuffleServerInfos();
      if (replicaSkipEnabled) {
        Set<ShuffleServerInfo> excludeServers = Sets.newHashSet();
        genServerToBlocks(
            sbi,
            allServers,
            replicaWrite,
            excludeServers,
            primaryServerToBlocks,
            primaryServerToBlockIds,
            true);
        genServerToBlocks(
            sbi,
            allServers,
            replica - replicaWrite,
            excludeServers,
            secondaryServerToBlocks,
            secondaryServerToBlockIds,
            false);
      } else {
        // When replicaSkip is disabled, we send data to all replicas within one round.
        genServerToBlocks(
            sbi,
            allServers,
            allServers.size(),
            null,
            primaryServerToBlocks,
            primaryServerToBlockIds,
            false);
      }
    }
    /** Records the ShuffleServer that successfully or failed to send blocks */
    // we assume that most of the blocks can be sent successfully
    // so initialize the map at first without concurrency insurance
    // AtomicInteger is enough to reflect value changes in other threads
    Map<Long, AtomicInteger> blockIdsSendSuccessTracker = Maps.newHashMap();
    primaryServerToBlockIds
        .values()
        .forEach(
            blockList ->
                blockList.forEach(
                    block ->
                        blockIdsSendSuccessTracker.computeIfAbsent(
                            block, id -> new AtomicInteger(0))));
    secondaryServerToBlockIds
        .values()
        .forEach(
            blockList ->
                blockList.forEach(
                    block ->
                        blockIdsSendSuccessTracker.computeIfAbsent(
                            block, id -> new AtomicInteger(0))));
    FailedBlockSendTracker blockIdsSendFailTracker = new FailedBlockSendTracker();
    ShuffleServerPushCostTracker shuffleServerPushCostTracker = new ShuffleServerPushCostTracker();

    // sent the primary round of blocks.
    boolean isAllSuccess =
        sendShuffleDataAsync(
            appId,
            stageAttemptNumber,
            primaryServerToBlocks,
            primaryServerToBlockIds,
            blockIdsSendSuccessTracker,
            blockIdsSendFailTracker,
            secondaryServerToBlocks.isEmpty(),
            needCancelRequest,
            shuffleServerPushCostTracker);

    // The secondary round of blocks is sent only when the primary group issues failed sending.
    // This should be infrequent.
    // Even though the secondary round may send blocks more than replicaWrite replicas,
    // we do not apply complicated skipping logic, because server crash is rare in production
    // environment.
    if (!isAllSuccess && !secondaryServerToBlocks.isEmpty() && !needCancelRequest.get()) {
      LOG.info("The sending of primary round is failed partially, so start the secondary round");
      sendShuffleDataAsync(
          appId,
          stageAttemptNumber,
          secondaryServerToBlocks,
          secondaryServerToBlockIds,
          blockIdsSendSuccessTracker,
          blockIdsSendFailTracker,
          true,
          needCancelRequest,
          shuffleServerPushCostTracker);
    }

    Set<Long> blockIdsSendSuccessSet = Sets.newHashSet();
    blockIdsSendSuccessTracker
        .entrySet()
        .forEach(
            successBlockId -> {
              if (successBlockId.getValue().get() >= replicaWrite) {
                blockIdsSendSuccessSet.add(successBlockId.getKey());
                // If the replicaWrite to be sent is reached,
                // no matter whether the block fails to be sent or not,
                // the block is considered to have been sent successfully and is removed from the
                // failed block tracker
                blockIdsSendFailTracker.remove(successBlockId.getKey());
              }
            });
    return new SendShuffleDataResult(
        blockIdsSendSuccessSet, blockIdsSendFailTracker, shuffleServerPushCostTracker);
  }