public void getSortedShuffleData()

in server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java [1581:1749]


  public void getSortedShuffleData(
      RssProtos.GetSortedShuffleDataRequest request,
      StreamObserver<RssProtos.GetSortedShuffleDataResponse> responseObserver) {
    try (ServerRpcAuditContext auditContext = createAuditContext("getSortedShuffleData")) {
      String appId = request.getAppId();
      int shuffleId = request.getShuffleId();
      int partitionId = request.getPartitionId();
      long blockId = request.getMergedBlockId();
      long timestamp = request.getTimestamp();
      auditContext
          .withAppId(appId)
          .withShuffleId(shuffleId)
          .withArgs(String.format("partitionId=%d, blockId=%d", partitionId, blockId));

      if (timestamp > 0) {
        long transportTime = System.currentTimeMillis() - timestamp;
        if (transportTime > 0) {
          shuffleServer
              .getGrpcMetrics()
              .recordTransportTime(
                  ShuffleServerGrpcMetrics.GET_SORTED_SHUFFLE_DATA_METHOD, transportTime);
        }
      }
      StatusCode status = StatusCode.SUCCESS;
      String msg = "OK";
      RssProtos.GetSortedShuffleDataResponse reply = null;
      ShuffleDataResult sdr = null;
      String requestInfo =
          "appId["
              + appId
              + "], shuffleId["
              + shuffleId
              + "], partitionId["
              + partitionId
              + "]"
              + "blockId["
              + blockId
              + "]";

      if (!shuffleServer.isRemoteMergeEnable()) {
        msg = "Remote merge is disabled";
        status = StatusCode.INTERNAL_ERROR;
        reply =
            RssProtos.GetSortedShuffleDataResponse.newBuilder()
                .setStatus(status.toProto())
                .setRetMsg(msg)
                .build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
        return;
      }

      MergeStatus mergeStatus =
          shuffleServer
              .getShuffleMergeManager()
              .tryGetBlock(appId, shuffleId, partitionId, blockId);
      MergeState mergeState = mergeStatus.getState();
      long blockSize = mergeStatus.getSize();
      if (mergeState == MergeState.INITED) {
        msg = MergeState.INITED.name();
        reply =
            RssProtos.GetSortedShuffleDataResponse.newBuilder()
                .setStatus(status.toProto())
                .setRetMsg(msg)
                .setMState(mergeState.code())
                .build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
        return;
      } else if (mergeState == MergeState.MERGING && blockSize == -1) {
        // Notify the client that all merged data has been read, but there may be data that has not
        // yet been merged.
        msg = MergeState.MERGING.name();
        reply =
            RssProtos.GetSortedShuffleDataResponse.newBuilder()
                .setStatus(status.toProto())
                .setNextBlockId(-1)
                .setRetMsg(msg)
                .setMState(mergeState.code())
                .build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
        return;
      } else if (mergeState == MergeState.DONE && blockSize == -1) {
        // Notify the client that all data has been read
        msg = MergeState.DONE.name();
        reply =
            RssProtos.GetSortedShuffleDataResponse.newBuilder()
                .setStatus(status.toProto())
                .setNextBlockId(-1)
                .setRetMsg(msg)
                .setMState(mergeState.code())
                .build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
        return;
      } else if (mergeState == MergeState.INTERNAL_ERROR) {
        msg = MergeState.INTERNAL_ERROR.name();
        status = StatusCode.INTERNAL_ERROR;
        reply =
            RssProtos.GetSortedShuffleDataResponse.newBuilder()
                .setStatus(status.toProto())
                .setRetMsg(msg)
                .setMState(mergeState.code())
                .build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
        return;
      }

      if (shuffleServer.getShuffleBufferManager().requireReadMemory(blockSize)) {
        try {
          long start = System.currentTimeMillis();
          sdr =
              shuffleServer
                  .getShuffleMergeManager()
                  .getShuffleData(appId, shuffleId, partitionId, blockId);
          long readTime = System.currentTimeMillis() - start;
          ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
          ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getDataLength());
          ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getDataLength());
          shuffleServer
              .getGrpcMetrics()
              .recordProcessTime(ShuffleServerGrpcMetrics.GET_SORTED_SHUFFLE_DATA_METHOD, readTime);
          LOG.info(
              "Successfully getSortedShuffleData cost {} ms for shuffle"
                  + " data with {}, length is {}, state is {}",
              readTime,
              requestInfo,
              sdr.getDataLength(),
              mergeState);
          auditContext.withReturnValue("len=" + sdr.getDataLength());
          reply =
              RssProtos.GetSortedShuffleDataResponse.newBuilder()
                  .setNextBlockId(blockId + 1) // next block id
                  .setMState(mergeState.code())
                  .setStatus(status.toProto())
                  .setRetMsg(msg)
                  .setData(UnsafeByteOperations.unsafeWrap(sdr.getData(), 0, sdr.getDataLength()))
                  .build();
        } catch (Exception e) {
          status = StatusCode.INTERNAL_ERROR;
          msg = "Error happened when get shuffle data for " + requestInfo + ", " + e.getMessage();
          LOG.error(msg, e);
          reply =
              RssProtos.GetSortedShuffleDataResponse.newBuilder()
                  .setStatus(status.toProto())
                  .setRetMsg(msg)
                  .build();
        } finally {
          if (sdr != null) {
            sdr.release();
          }
          shuffleServer.getShuffleBufferManager().releaseReadMemory(blockSize);
        }
      } else {
        status = StatusCode.NO_BUFFER;
        msg = "Can't require read memory to get sorted shuffle data";
        LOG.error(msg + " for " + requestInfo);
        reply =
            RssProtos.GetSortedShuffleDataResponse.newBuilder()
                .setStatus(status.toProto())
                .setRetMsg(msg)
                .build();
      }
      responseObserver.onNext(reply);
      responseObserver.onCompleted();
    }
  }