private void submitRetryPushData()

in client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java [215:307]


  private void submitRetryPushData(
      int shuffleId,
      byte[] body,
      int batchId,
      PushDataRpcResponseCallback pushDataRpcResponseCallback,
      PushState pushState,
      ReviveRequest request,
      int remainReviveTimes,
      long dueTime) {
    int mapId = request.mapId;
    int attemptId = request.attemptId;
    PartitionLocation loc = request.loc;
    StatusCode cause = request.cause;
    int partitionId = loc.getId();
    long reviveWaitTime = dueTime - System.currentTimeMillis();
    final long delta = 50;
    long accumulatedTime = 0;
    while (request.reviveStatus == StatusCode.REVIVE_INITIALIZED.getValue()
        && accumulatedTime <= reviveWaitTime) {
      try {
        Thread.sleep(delta);
        accumulatedTime += delta;
      } catch (InterruptedException e) {
        logger.error("Interrupted while waiting for Revive result!");
        Thread.currentThread().interrupt();
      }
    }
    if (mapperEnded(shuffleId, mapId)) {
      logger.debug(
          "Revive for push data success, but the mapper already ended for shuffle {} map {} attempt {} partition {} batch {} location {}.",
          shuffleId,
          mapId,
          attemptId,
          partitionId,
          batchId,
          loc);
      pushState.removeBatch(batchId, loc.hostAndPushPort());
    } else if (request.reviveStatus != StatusCode.SUCCESS.getValue()) {
      pushDataRpcResponseCallback.onFailure(
          new CelebornIOException(
              cause
                  + " then revive but "
                  + StatusCode.REVIVE_FAILED
                  + ", revive status "
                  + request.reviveStatus
                  + "("
                  + Utils.toStatusCode(request.reviveStatus)
                  + ")"
                  + ", old location: "
                  + request.loc));
    } else {
      PartitionLocation newLoc = reducePartitionMap.get(shuffleId).get(partitionId);
      logger.info(
          "Revive for push data success, new location for shuffle {} map {} attempt {} partition {} batch {} is location {}.",
          shuffleId,
          mapId,
          attemptId,
          partitionId,
          batchId,
          newLoc);
      try {
        if (!isPushTargetWorkerExcluded(newLoc, pushDataRpcResponseCallback)) {
          if (!testRetryRevive || remainReviveTimes < 1) {
            TransportClient client =
                dataClientFactory.createClient(newLoc.getHost(), newLoc.getPushPort(), partitionId);
            NettyManagedBuffer newBuffer = new NettyManagedBuffer(Unpooled.wrappedBuffer(body));
            String shuffleKey = Utils.makeShuffleKey(appUniqueId, shuffleId);
            PushData newPushData =
                new PushData(PRIMARY_MODE, shuffleKey, newLoc.getUniqueId(), newBuffer);
            pushDataRpcResponseCallback.updateLatestPartition(newLoc);
            client.pushData(newPushData, pushDataTimeout, pushDataRpcResponseCallback);
          } else {
            throw new RuntimeException(
                "Mock push data submit retry failed. remainReviveTimes = "
                    + remainReviveTimes
                    + ".");
          }
        }
      } catch (Exception e) {
        logger.error(
            "Exception raised while pushing data for shuffle {} map {} attempt {} partition {} batch {} location {}.",
            shuffleId,
            mapId,
            attemptId,
            partitionId,
            batchId,
            newLoc,
            e);
        pushDataRpcResponseCallback.onFailure(
            new CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_PRIMARY, e));
      }
    }
  }