public void sendShuffleData()

in server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java [408:621]


  public void sendShuffleData(
      SendShuffleDataRequest req, StreamObserver<SendShuffleDataResponse> responseObserver) {
    try (ServerRpcAuditContext auditContext = createAuditContext("sendShuffleData")) {
      SendShuffleDataResponse reply;
      String appId = req.getAppId();
      int shuffleId = req.getShuffleId();
      long requireBufferId = req.getRequireBufferId();
      long timestamp = req.getTimestamp();
      int stageAttemptNumber = req.getStageAttemptNumber();

      auditContext.withAppId(appId).withShuffleId(shuffleId);
      auditContext.withArgs(
          "requireBufferId="
              + requireBufferId
              + ", timestamp="
              + timestamp
              + ", stageAttemptNumber="
              + stageAttemptNumber
              + ", shuffleDataSize="
              + req.getShuffleDataCount());

      ShuffleTaskInfo taskInfo = shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(appId);
      if (taskInfo == null) {
        String errorMsg =
            "APP_NOT_FOUND error, requireBufferId["
                + requireBufferId
                + "] for appId["
                + appId
                + "], shuffleId["
                + shuffleId
                + "]";
        LOG.error(errorMsg);
        ShuffleServerMetrics.counterAppNotFound.inc();
        reply =
            SendShuffleDataResponse.newBuilder()
                .setStatus(StatusCode.APP_NOT_FOUND.toProto())
                .setRetMsg(errorMsg)
                .build();
        auditContext.withStatusCode(StatusCode.fromProto(reply.getStatus()));
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
        return;
      }
      Integer latestStageAttemptNumber = taskInfo.getLatestStageAttemptNumber(shuffleId);
      // The Stage retry occurred, and the task before StageNumber was simply ignored and not
      // processed if the task was being sent.
      if (stageAttemptNumber < latestStageAttemptNumber) {
        String responseMessage = "A retry has occurred at the Stage, sending data is invalid.";
        reply =
            SendShuffleDataResponse.newBuilder()
                .setStatus(StatusCode.STAGE_RETRY_IGNORE.toProto())
                .setRetMsg(responseMessage)
                .build();
        auditContext.withStatusCode(StatusCode.fromProto(reply.getStatus()));
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
        return;
      }
      if (timestamp > 0) {
        /*
         * Here we record the transport time, but we don't consider the impact of data size on transport time.
         * The amount of data will not cause great fluctuations in latency. For example, 100K costs 1ms,
         * and 1M costs 10ms. This seems like a normal fluctuation, but it may rise to 10s when the server load is high.
         * In addition, we need to pay attention to that the time of the client machine and the machine
         * time of the Shuffle Server should be kept in sync. TransportTime is accurate only if this condition is met.
         * */
        long transportTime = System.currentTimeMillis() - timestamp;
        if (transportTime > 0) {
          shuffleServer
              .getGrpcMetrics()
              .recordTransportTime(
                  ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, transportTime);
        }
      }
      int requireSize = shuffleServer.getShuffleTaskManager().getRequireBufferSize(requireBufferId);

      StatusCode ret = StatusCode.SUCCESS;
      String responseMessage = "OK";
      if (req.getShuffleDataCount() > 0) {
        ShuffleServerMetrics.counterTotalReceivedDataSize.inc(requireSize);
        ShuffleTaskManager manager = shuffleServer.getShuffleTaskManager();
        PreAllocatedBufferInfo info = manager.getAndRemovePreAllocatedBuffer(requireBufferId);
        boolean isPreAllocated = info != null;
        if (!isPreAllocated) {
          String errorMsg =
              "Can't find requireBufferId["
                  + requireBufferId
                  + "] for appId["
                  + appId
                  + "], shuffleId["
                  + shuffleId
                  + "]";
          LOG.warn(errorMsg);
          responseMessage = errorMsg;
          reply =
              SendShuffleDataResponse.newBuilder()
                  .setStatus(StatusCode.INTERNAL_ERROR.toProto())
                  .setRetMsg(responseMessage)
                  .build();
          auditContext.withStatusCode(StatusCode.fromProto(reply.getStatus()));
          responseObserver.onNext(reply);
          responseObserver.onCompleted();
          return;
        }
        final long start = System.currentTimeMillis();
        List<ShufflePartitionedData> shufflePartitionedDataList = toPartitionedDataList(req);
        long alreadyReleasedSize = 0;
        boolean hasFailureOccurred = false;
        for (ShufflePartitionedData spd : shufflePartitionedDataList) {
          String shuffleDataInfo =
              "appId["
                  + appId
                  + "], shuffleId["
                  + shuffleId
                  + "], partitionId["
                  + spd.getPartitionId()
                  + "]";
          try {
            ret = manager.cacheShuffleData(appId, shuffleId, isPreAllocated, spd);
            if (ret != StatusCode.SUCCESS) {
              String errorMsg =
                  "Error happened when shuffleEngine.write for "
                      + shuffleDataInfo
                      + ", statusCode="
                      + ret;
              LOG.error(errorMsg);
              responseMessage = errorMsg;
              hasFailureOccurred = true;
              break;
            } else {
              if (shuffleServer.isRemoteMergeEnable()) {
                shuffleServer.getShuffleMergeManager().setDirect(appId, shuffleId, false);
              }
              long toReleasedSize = spd.getTotalBlockEncodedLength();
              // after each cacheShuffleData call, the `preAllocatedSize` is updated timely.
              manager.releasePreAllocatedSize(toReleasedSize);
              alreadyReleasedSize += toReleasedSize;
              manager.updateCachedBlockIds(
                  appId, shuffleId, spd.getPartitionId(), spd.getBlockList());
            }
          } catch (ExceedHugePartitionHardLimitException e) {
            String errorMsg =
                "ExceedHugePartitionHardLimitException Error happened when shuffleEngine.write for "
                    + shuffleDataInfo
                    + ": "
                    + e.getMessage();
            ShuffleServerMetrics.counterTotalHugePartitionExceedHardLimitNum.inc();
            ret = StatusCode.EXCEED_HUGE_PARTITION_HARD_LIMIT;
            responseMessage = errorMsg;
            LOG.error(errorMsg);
            hasFailureOccurred = true;
          } catch (Exception e) {
            String errorMsg =
                "Error happened when shuffleEngine.write for "
                    + shuffleDataInfo
                    + ": "
                    + e.getMessage();
            ret = StatusCode.INTERNAL_ERROR;
            responseMessage = errorMsg;
            LOG.error(errorMsg);
            hasFailureOccurred = true;
            break;
          } finally {
            if (hasFailureOccurred) {
              shuffleServer
                  .getShuffleBufferManager()
                  .releaseMemory(spd.getTotalBlockEncodedLength(), false, false);
            }
          }
        }
        // since the required buffer id is only used once, the shuffle client would try to require
        // another buffer whether
        // current connection succeeded or not. Therefore, the preAllocatedBuffer is first get and
        // removed, then after
        // cacheShuffleData finishes, the preAllocatedSize should be updated accordingly.
        if (info.getRequireSize() > alreadyReleasedSize) {
          manager.releasePreAllocatedSize(info.getRequireSize() - alreadyReleasedSize);
        }
        reply =
            SendShuffleDataResponse.newBuilder()
                .setStatus(ret.toProto())
                .setRetMsg(responseMessage)
                .build();
        long costTime = System.currentTimeMillis() - start;
        shuffleServer
            .getGrpcMetrics()
            .recordProcessTime(ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD, costTime);
        if (LOG.isDebugEnabled()) {
          LOG.debug(
              "Cache Shuffle Data for appId["
                  + appId
                  + "], shuffleId["
                  + shuffleId
                  + "], cost "
                  + costTime
                  + " ms with "
                  + shufflePartitionedDataList.size()
                  + " blocks and "
                  + requireSize
                  + " bytes");
        }
      } else {
        reply =
            SendShuffleDataResponse.newBuilder()
                .setStatus(StatusCode.INTERNAL_ERROR.toProto())
                .setRetMsg("No data in request")
                .build();
      }

      auditContext.withStatusCode(StatusCode.fromProto(reply.getStatus()));
      responseObserver.onNext(reply);
      responseObserver.onCompleted();
    }
  }