public void reportShuffleResult()

in client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java [659:744]


  public void reportShuffleResult(
      Map<Integer, List<ShuffleServerInfo>> partitionToServers,
      String appId,
      int shuffleId,
      long taskAttemptId,
      Map<Integer, List<Long>> partitionToBlockIds,
      int bitmapNum) {
    Map<ShuffleServerInfo, List<Integer>> groupedPartitions = Maps.newHashMap();
    Map<Integer, Integer> partitionReportTracker = Maps.newHashMap();
    for (Map.Entry<Integer, List<ShuffleServerInfo>> entry : partitionToServers.entrySet()) {
      int partitionIdx = entry.getKey();
      for (ShuffleServerInfo ssi : entry.getValue()) {
        if (!groupedPartitions.containsKey(ssi)) {
          groupedPartitions.put(ssi, Lists.newArrayList());
        }
        groupedPartitions.get(ssi).add(partitionIdx);
      }
      if (CollectionUtils.isNotEmpty(partitionToBlockIds.get(partitionIdx))) {
        partitionReportTracker.putIfAbsent(partitionIdx, 0);
      }
    }

    for (Map.Entry<ShuffleServerInfo, List<Integer>> entry : groupedPartitions.entrySet()) {
      Map<Integer, List<Long>> requestBlockIds = Maps.newHashMap();
      for (Integer partitionId : entry.getValue()) {
        List<Long> blockIds = partitionToBlockIds.get(partitionId);
        if (CollectionUtils.isNotEmpty(blockIds)) {
          requestBlockIds.put(partitionId, blockIds);
        }
      }
      if (requestBlockIds.isEmpty()) {
        continue;
      }
      RssReportShuffleResultRequest request =
          new RssReportShuffleResultRequest(
              appId, shuffleId, taskAttemptId, requestBlockIds, bitmapNum);
      ShuffleServerInfo ssi = entry.getKey();
      try {
        RssReportShuffleResultResponse response =
            getShuffleServerClient(ssi).reportShuffleResult(request);
        if (response.getStatusCode() == StatusCode.SUCCESS) {
          LOG.info(
              "Report shuffle result to "
                  + ssi
                  + " for appId["
                  + appId
                  + "], shuffleId["
                  + shuffleId
                  + "] successfully");
          for (Integer partitionId : requestBlockIds.keySet()) {
            partitionReportTracker.put(partitionId, partitionReportTracker.get(partitionId) + 1);
          }
        } else {
          LOG.warn(
              "Report shuffle result to "
                  + ssi
                  + " for appId["
                  + appId
                  + "], shuffleId["
                  + shuffleId
                  + "] failed with "
                  + response.getStatusCode());
        }
      } catch (Exception e) {
        LOG.warn(
            "Report shuffle result is failed to "
                + ssi
                + " for appId["
                + appId
                + "], shuffleId["
                + shuffleId
                + "]");
      }
    }
    // quorum check
    for (Map.Entry<Integer, Integer> entry : partitionReportTracker.entrySet()) {
      if (entry.getValue() < replicaWrite) {
        throw new RssException(
            "Quorum check of report shuffle result is failed for appId["
                + appId
                + "], shuffleId["
                + shuffleId
                + "]");
      }
    }
  }