private void submitRetryPushMergedData()

in client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java [327:455]


  private void submitRetryPushMergedData(
      PushState pushState,
      int shuffleId,
      int mapId,
      int attemptId,
      ArrayList<DataBatches.DataBatch> batches,
      StatusCode cause,
      Integer oldGroupedBatchId,
      ReviveRequest[] reviveRequests,
      int remainReviveTimes,
      long reviveResponseDueTime) {
    HashMap<Pair<String, String>, DataBatches> newDataBatchesMap = new HashMap<>();
    ArrayList<DataBatches.DataBatch> reviveFailedBatchesMap = new ArrayList<>();

    long reviveWaitTime = reviveResponseDueTime - System.currentTimeMillis();
    final long delta = 50;
    long accumulatedTime = 0;
    int index = 0;
    while (index < reviveRequests.length && accumulatedTime <= reviveWaitTime) {
      ReviveRequest request = reviveRequests[index];
      DataBatches.DataBatch batch = batches.get(index);
      if (request.reviveStatus != StatusCode.REVIVE_INITIALIZED.getValue()) {
        if (mapperEnded(shuffleId, mapId)) {
          logger.debug(
              "Revive for push merged data success, but the mapper already ended for shuffle {} map {} attempt {} partition {} batch {}.",
              shuffleId,
              mapId,
              attemptId,
              request.partitionId,
              oldGroupedBatchId);
        } else if (request.reviveStatus == StatusCode.SUCCESS.getValue()) {
          PartitionLocation newLoc = reducePartitionMap.get(shuffleId).get(request.partitionId);
          DataBatches newDataBatches =
              newDataBatchesMap.computeIfAbsent(genAddressPair(newLoc), (s) -> new DataBatches());
          newDataBatches.addDataBatch(newLoc, batch.batchId, batch.body);
        } else {
          if (remainReviveTimes > 0) {
            reviveFailedBatchesMap.add(batch);
          } else {
            String errorMsg =
                String.format(
                    "Revive failed while pushing merged for shuffle %d map %d attempt %d partition %d batch %d location %s.",
                    shuffleId, mapId, attemptId, request.partitionId, oldGroupedBatchId, batch.loc);
            pushState.exception.compareAndSet(
                null,
                new CelebornIOException(
                    errorMsg,
                    new CelebornIOException(
                        cause
                            + " then revive but "
                            + request.reviveStatus
                            + "("
                            + Utils.toStatusCode(request.reviveStatus)
                            + ")")));
            return;
          }
        }
        index++;
      } else {
        try {
          Thread.sleep(delta);
        } catch (InterruptedException e) {
          logger.error("Interrupted while waiting for Revive result!");
          Thread.currentThread().interrupt();
        }
        accumulatedTime += delta;
      }
    }

    for (int i = index; i < reviveRequests.length; i++) {
      ReviveRequest request = reviveRequests[index];
      DataBatches.DataBatch batch = batches.get(i);
      if (remainReviveTimes > 0) {
        reviveFailedBatchesMap.add(batch);
      } else {
        String errorMsg =
            String.format(
                "Revive failed while pushing merged for shuffle %d map %d attempt %d partition %d batch %d location %s.",
                shuffleId, mapId, attemptId, request.partitionId, oldGroupedBatchId, batch.loc);
        pushState.exception.compareAndSet(
            null,
            new CelebornIOException(
                errorMsg,
                new CelebornIOException(
                    cause
                        + " then revive but "
                        + request.reviveStatus
                        + "("
                        + Utils.toStatusCode(request.reviveStatus)
                        + ")")));
        return;
      }
    }

    for (Map.Entry<Pair<String, String>, DataBatches> entry : newDataBatchesMap.entrySet()) {
      Pair<String, String> addressPair = entry.getKey();
      DataBatches newDataBatches = entry.getValue();
      doPushMergedData(
          addressPair,
          shuffleId,
          mapId,
          attemptId,
          newDataBatches.requireBatches(),
          pushState,
          remainReviveTimes);
    }
    if (reviveFailedBatchesMap.isEmpty()) {
      pushState.removeBatch(oldGroupedBatchId, batches.get(0).loc.hostAndPushPort());
    } else {
      ReviveRequest[] requests =
          addAndGetReviveRequests(shuffleId, mapId, attemptId, reviveFailedBatchesMap, cause);
      pushDataRetryPool.submit(
          () ->
              submitRetryPushMergedData(
                  pushState,
                  shuffleId,
                  mapId,
                  attemptId,
                  reviveFailedBatchesMap,
                  cause,
                  oldGroupedBatchId,
                  requests,
                  remainReviveTimes - 1,
                  System.currentTimeMillis()
                      + conf.clientRpcRequestPartitionLocationRpcAskTimeout()
                          .duration()
                          .toMillis()));
    }
  }