private boolean sendShuffleDataAsync()

in client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java [181:266]


  private boolean sendShuffleDataAsync(
      String appId,
      Map<ShuffleServerInfo, Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>> serverToBlocks,
      Map<ShuffleServerInfo, List<Long>> serverToBlockIds,
      Map<Long, AtomicInteger> blockIdsTracker,
      boolean allowFastFail,
      Supplier<Boolean> needCancelRequest) {

    if (serverToBlockIds == null) {
      return true;
    }

    // If one or more servers is failed, the sending is not totally successful.
    List<CompletableFuture<Boolean>> futures = new ArrayList<>();
    for (Map.Entry<ShuffleServerInfo, Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>> entry :
        serverToBlocks.entrySet()) {
      CompletableFuture<Boolean> future =
          CompletableFuture.supplyAsync(
              () -> {
                if (needCancelRequest.get()) {
                  LOG.info("The upstream task has been failed. Abort this data send.");
                  return true;
                }
                ShuffleServerInfo ssi = entry.getKey();
                try {
                  Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks =
                      entry.getValue();
                  // todo: compact unnecessary blocks that reach replicaWrite
                  RssSendShuffleDataRequest request =
                      new RssSendShuffleDataRequest(
                          appId, retryMax, retryIntervalMax, shuffleIdToBlocks);
                  long s = System.currentTimeMillis();
                  RssSendShuffleDataResponse response =
                      getShuffleServerClient(ssi).sendShuffleData(request);

                  String logMsg =
                      String.format(
                          "ShuffleWriteClientImpl sendShuffleData with %s blocks to %s cost: %s(ms)",
                          serverToBlockIds.get(ssi).size(),
                          ssi.getId(),
                          System.currentTimeMillis() - s);

                  if (response.getStatusCode() == StatusCode.SUCCESS) {
                    // mark a replica of block that has been sent
                    serverToBlockIds
                        .get(ssi)
                        .forEach(block -> blockIdsTracker.get(block).incrementAndGet());
                    if (defectiveServers != null) {
                      defectiveServers.remove(ssi);
                    }
                    LOG.debug("{} successfully.", logMsg);
                  } else {
                    if (defectiveServers != null) {
                      defectiveServers.add(ssi);
                    }
                    LOG.warn("{}, it failed wth statusCode[{}]", logMsg, response.getStatusCode());
                    return false;
                  }
                } catch (Exception e) {
                  if (defectiveServers != null) {
                    defectiveServers.add(ssi);
                  }
                  LOG.warn(
                      "Send: "
                          + serverToBlockIds.get(ssi).size()
                          + " blocks to ["
                          + ssi.getId()
                          + "] failed.",
                      e);
                  return false;
                }
                return true;
              },
              dataTransferPool);
      futures.add(future);
    }

    boolean result = ClientUtils.waitUntilDoneOrFail(futures, allowFastFail);
    if (!result) {
      LOG.error(
          "Some shuffle data can't be sent to shuffle-server, is fast fail: {}, cancelled task size: {}",
          allowFastFail,
          futures.size());
    }
    return result;
  }