public void reportShuffleWriteFailure()

in client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java [67:159]


  public void reportShuffleWriteFailure(
      RssProtos.ReportShuffleWriteFailureRequest request,
      StreamObserver<RssProtos.ReportShuffleWriteFailureResponse> responseObserver) {
    String appId = request.getAppId();
    int shuffleId = request.getShuffleId();
    int stageAttemptId = request.getStageAttemptId();
    int stageAttemptNumber = request.getStageAttemptNumber();
    List<RssProtos.ShuffleServerId> shuffleServerIdsList = request.getShuffleServerIdsList();
    RssProtos.StatusCode code;
    boolean reSubmitWholeStage;
    String msg;
    if (!appId.equals(shuffleManager.getAppId())) {
      msg =
          String.format(
              "got a wrong shuffle write failure report from appId: %s, expected appId: %s",
              appId, shuffleManager.getAppId());
      LOG.warn(msg);
      code = RssProtos.StatusCode.INVALID_REQUEST;
      reSubmitWholeStage = false;
    } else {
      Map<String, AtomicInteger> initServerFailures = JavaUtils.newConcurrentMap();
      List<ShuffleServerInfo> shuffleServerInfos =
          ShuffleServerInfo.fromProto(shuffleServerIdsList);
      shuffleServerInfos.forEach(
          shuffleServerInfo ->
              initServerFailures.computeIfAbsent(
                  shuffleServerInfo.getId(), key -> new AtomicInteger(0)));
      ShuffleServerWriterFailureRecord shuffleServerWriterFailureRecord =
          shuffleWriteStatus.computeIfAbsent(
              shuffleId,
              key -> new ShuffleServerWriterFailureRecord(stageAttemptNumber, initServerFailures));
      boolean resetflag =
          shuffleServerWriterFailureRecord.resetStageAttemptIfNecessary(stageAttemptNumber);
      if (resetflag) {
        msg =
            String.format(
                "got an old stage(%d_%d) shuffle write failure report, which should be impossible.",
                stageAttemptId, stageAttemptNumber);
        LOG.warn(msg);
        code = RssProtos.StatusCode.INVALID_REQUEST;
        reSubmitWholeStage = false;
      } else {
        synchronized (shuffleServerWriterFailureRecord) {
          code = RssProtos.StatusCode.SUCCESS;
          // update the stage shuffleServer write failed count
          boolean isFetchFailed =
              shuffleServerWriterFailureRecord.incWriteFailureForShuffleServer(
                  stageAttemptNumber, shuffleServerInfos, shuffleManager);
          if (isFetchFailed) {
            reSubmitWholeStage = true;
            msg =
                String.format(
                    "Report shuffle write failure as maximum number(%d) of shuffle write is occurred.",
                    shuffleManager.getMaxFetchFailures());
            if (!shuffleServerWriterFailureRecord.isClearedMapTrackerBlock()) {
              try {
                // Clear the metadata of the completed task, otherwise some of the stage's data will
                // be lost.
                shuffleManager.unregisterAllMapOutput(shuffleId);
                // Deregister the shuffleId corresponding to the Shuffle Server.
                shuffleManager.getShuffleWriteClient().unregisterShuffle(appId, shuffleId);
                shuffleServerWriterFailureRecord.setClearedMapTrackerBlock(true);
                LOG.info(
                    "Clear shuffle result in shuffleId:{}, stageId:{}, stageAttemptNumber:{} in the write failure phase.",
                    shuffleId,
                    stageAttemptId,
                    stageAttemptNumber);
              } catch (SparkException e) {
                LOG.error(
                    "Clear MapoutTracker Meta failed in shuffleId:{}, stageAttemptId:{}, stageAttemptNumber:{} in the write failure phase.",
                    shuffleId,
                    stageAttemptId,
                    stageAttemptNumber);
                throw new RssException("Clear MapoutTracker Meta failed!", e);
              }
            }
          } else {
            reSubmitWholeStage = false;
            msg = "The maximum number of failures was not reached.";
          }
        }
      }
    }

    RssProtos.ReportShuffleWriteFailureResponse reply =
        RssProtos.ReportShuffleWriteFailureResponse.newBuilder()
            .setStatus(code)
            .setReSubmitWholeStage(reSubmitWholeStage)
            .setMsg(msg)
            .build();
    responseObserver.onNext(reply);
    responseObserver.onCompleted();
  }