private boolean sendShuffleDataAsync()

in client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java [167:283]


  private boolean sendShuffleDataAsync(
      String appId,
      int stageAttemptNumber,
      Map<ShuffleServerInfo, Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>> serverToBlocks,
      Map<ShuffleServerInfo, List<Long>> serverToBlockIds,
      Map<Long, AtomicInteger> blockIdsSendSuccessTracker,
      FailedBlockSendTracker failedBlockSendTracker,
      boolean allowFastFail,
      Supplier<Boolean> needCancelRequest,
      ShuffleServerPushCostTracker shuffleServerPushCostTracker) {

    if (serverToBlockIds == null) {
      return true;
    }

    // If one or more servers is failed, the sending is not totally successful.
    List<CompletableFuture<Boolean>> futures = new ArrayList<>();
    for (Map.Entry<ShuffleServerInfo, Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>> entry :
        serverToBlocks.entrySet()) {
      CompletableFuture<Boolean> future =
          CompletableFuture.supplyAsync(
                  () -> {
                    if (needCancelRequest.get()) {
                      LOG.info("The upstream task has been failed. Abort this data send.");
                      return true;
                    }
                    ShuffleServerInfo ssi = entry.getKey();
                    try {
                      Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks =
                          entry.getValue();
                      // todo: compact unnecessary blocks that reach replicaWrite
                      RssSendShuffleDataRequest request =
                          new RssSendShuffleDataRequest(
                              appId,
                              stageAttemptNumber,
                              retryMax,
                              retryIntervalMax,
                              shuffleIdToBlocks);
                      long s = System.currentTimeMillis();
                      RssSendShuffleDataResponse response =
                          getShuffleServerClient(ssi).sendShuffleData(request);
                      long pushDuration = System.currentTimeMillis() - s;
                      String logMsg =
                          String.format(
                              "ShuffleWriteClientImpl sendShuffleData with %s blocks to %s cost: %s(ms)",
                              serverToBlockIds.get(ssi).size(), ssi.getId(), pushDuration);

                      if (response.getStatusCode() == StatusCode.SUCCESS) {
                        // mark a replica of block that has been sent
                        serverToBlockIds
                            .get(ssi)
                            .forEach(
                                blockId ->
                                    blockIdsSendSuccessTracker.get(blockId).incrementAndGet());
                        recordNeedSplitPartition(
                            failedBlockSendTracker, ssi, response.getNeedSplitPartitionIds());
                        if (defectiveServers != null) {
                          defectiveServers.remove(ssi);
                        }
                        if (LOG.isDebugEnabled()) {
                          LOG.debug("{} successfully.", logMsg);
                        }
                      } else {
                        recordFailedBlocks(
                            failedBlockSendTracker, serverToBlocks, ssi, response.getStatusCode());
                        if (defectiveServers != null) {
                          defectiveServers.add(ssi);
                        }
                        LOG.warn(
                            "{}, it failed wth statusCode[{}]", logMsg, response.getStatusCode());
                        return false;
                      }

                      // record shuffle-server push cost
                      long sentBytes =
                          shuffleIdToBlocks.values().stream()
                              .flatMap(x -> x.values().stream())
                              .flatMap(x -> x.stream())
                              .map(x -> x.getLength())
                              .reduce((a, b) -> a + b)
                              .orElse(0);
                      shuffleServerPushCostTracker.record(ssi.getId(), sentBytes, pushDuration);
                    } catch (Exception e) {
                      recordFailedBlocks(
                          failedBlockSendTracker, serverToBlocks, ssi, StatusCode.INTERNAL_ERROR);
                      if (defectiveServers != null) {
                        defectiveServers.add(ssi);
                      }
                      LOG.warn(
                          "Send: "
                              + serverToBlockIds.get(ssi).size()
                              + " blocks to ["
                              + ssi.getId()
                              + "] failed.",
                          e);
                      return false;
                    }
                    return true;
                  },
                  dataTransferPool)
              .exceptionally(
                  ex -> {
                    LOG.error("Unexpected exceptions occurred while sending shuffle data", ex);
                    return false;
                  });
      futures.add(future);
    }

    boolean result = ClientUtils.waitUntilDoneOrFail(futures, allowFastFail);
    if (!result) {
      LOG.error(
          "Some shuffle data can't be sent to shuffle-server, is fast fail: {}, cancelled task size: {}",
          allowFastFail,
          futures.size());
    }
    return result;
  }