public boolean sendCommit()

in client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java [442:525]


  public boolean sendCommit(
      Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int numMaps) {
    ForkJoinPool forkJoinPool =
        new ForkJoinPool(
            dataCommitPoolSize == -1 ? shuffleServerInfoSet.size() : dataCommitPoolSize);
    AtomicInteger successfulCommit = new AtomicInteger(0);
    try {
      forkJoinPool
          .submit(
              () -> {
                shuffleServerInfoSet
                    .parallelStream()
                    .forEach(
                        ssi -> {
                          RssSendCommitRequest request = new RssSendCommitRequest(appId, shuffleId);
                          String errorMsg =
                              "Failed to commit shuffle data to "
                                  + ssi
                                  + " for shuffleId["
                                  + shuffleId
                                  + "]";
                          long startTime = System.currentTimeMillis();
                          try {
                            RssSendCommitResponse response =
                                getShuffleServerClient(ssi).sendCommit(request);
                            if (response.getStatusCode() == StatusCode.SUCCESS) {
                              int commitCount = response.getCommitCount();
                              LOG.info(
                                  "Successfully sendCommit for appId["
                                      + appId
                                      + "], shuffleId["
                                      + shuffleId
                                      + "] to ShuffleServer["
                                      + ssi.getId()
                                      + "], cost "
                                      + (System.currentTimeMillis() - startTime)
                                      + " ms, got committed maps["
                                      + commitCount
                                      + "], map number of stage is "
                                      + numMaps);
                              if (commitCount >= numMaps) {
                                RssFinishShuffleResponse rfsResponse =
                                    getShuffleServerClient(ssi)
                                        .finishShuffle(
                                            new RssFinishShuffleRequest(appId, shuffleId));
                                if (rfsResponse.getStatusCode() != StatusCode.SUCCESS) {
                                  String msg =
                                      "Failed to finish shuffle to "
                                          + ssi
                                          + " for shuffleId["
                                          + shuffleId
                                          + "] with statusCode "
                                          + rfsResponse.getStatusCode();
                                  LOG.error(msg);
                                  throw new Exception(msg);
                                } else {
                                  LOG.info(
                                      "Successfully finish shuffle to "
                                          + ssi
                                          + " for shuffleId["
                                          + shuffleId
                                          + "]");
                                }
                              }
                            } else {
                              String msg =
                                  errorMsg + " with statusCode " + response.getStatusCode();
                              LOG.error(msg);
                              throw new Exception(msg);
                            }
                            successfulCommit.incrementAndGet();
                          } catch (Exception e) {
                            LOG.error(errorMsg, e);
                          }
                        });
              })
          .join();
    } finally {
      forkJoinPool.shutdownNow();
    }

    // check if every commit/finish call is successful
    return successfulCommit.get() == shuffleServerInfoSet.size();
  }