public void getLocalShuffleData()

in server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java [1117:1265]


  public void getLocalShuffleData(
      GetLocalShuffleDataRequest request,
      StreamObserver<GetLocalShuffleDataResponse> responseObserver) {
    try (ServerRpcAuditContext auditContext = createAuditContext("getLocalShuffleData")) {
      String appId = request.getAppId();
      int shuffleId = request.getShuffleId();
      int partitionId = request.getPartitionId();
      int partitionNumPerRange = request.getPartitionNumPerRange();
      int partitionNum = request.getPartitionNum();
      long offset = request.getOffset();
      int length = request.getLength();
      int storageId = request.getStorageId();

      auditContext.withAppId(appId).withShuffleId(shuffleId);
      auditContext.withArgs(
          "partitionId="
              + partitionId
              + ", partitionNumPerRange="
              + partitionNumPerRange
              + ", partitionNum="
              + partitionNum
              + ", offset="
              + offset
              + ", length="
              + length
              + ", storageId="
              + storageId);

      StatusCode status = verifyRequest(appId);
      if (status != StatusCode.SUCCESS) {
        auditContext.withStatusCode(status);
        GetLocalShuffleDataResponse response =
            GetLocalShuffleDataResponse.newBuilder()
                .setStatus(status.toProto())
                .setRetMsg(status.toString())
                .build();
        responseObserver.onNext(response);
        responseObserver.onCompleted();
        return;
      }

      long timestamp = request.getTimestamp();
      if (timestamp > 0) {
        long transportTime = System.currentTimeMillis() - timestamp;
        if (transportTime > 0) {
          shuffleServer
              .getGrpcMetrics()
              .recordTransportTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, transportTime);
        }
      }
      String storageType =
          shuffleServer.getShuffleServerConf().get(RssBaseConf.RSS_STORAGE_TYPE).name();
      String msg = "OK";
      GetLocalShuffleDataResponse reply = null;
      ShuffleDataResult sdr = null;
      String requestInfo =
          "appId["
              + appId
              + "], shuffleId["
              + shuffleId
              + "], partitionId["
              + partitionId
              + "]"
              + "offset["
              + offset
              + "]"
              + "length["
              + length
              + "]";

      int[] range =
          ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum);
      Storage storage =
          shuffleServer
              .getStorageManager()
              .selectStorage(
                  new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0], storageId));
      if (storage != null) {
        storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId));
      }

      if (shuffleServer.getShuffleBufferManager().requireReadMemory(length)) {
        try {
          long start = System.currentTimeMillis();
          sdr =
              shuffleServer
                  .getShuffleTaskManager()
                  .getShuffleData(
                      appId,
                      shuffleId,
                      partitionId,
                      partitionNumPerRange,
                      partitionNum,
                      storageType,
                      offset,
                      length,
                      storageId);
          reply =
              GetLocalShuffleDataResponse.newBuilder()
                  .setStatus(status.toProto())
                  .setRetMsg(msg)
                  .setData(UnsafeByteOperations.unsafeWrap(sdr.getData()))
                  .build();
          long readTime = System.currentTimeMillis() - start;
          ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
          ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getDataLength());
          ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getDataLength());
          ShuffleServerMetrics.gaugeReadLocalDataFileThreadNum.inc();
          ShuffleServerMetrics.gaugeReadLocalDataFileBufferSize.inc(length);
          shuffleServer
              .getGrpcMetrics()
              .recordProcessTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, readTime);
          LOG.info(
              "Successfully getShuffleData cost {} ms for shuffle data with {}",
              readTime,
              requestInfo);
        } catch (Exception e) {
          status = StatusCode.INTERNAL_ERROR;
          msg = "Error happened when get shuffle data for " + requestInfo + ", " + e.getMessage();
          LOG.error(msg, e);
          reply =
              GetLocalShuffleDataResponse.newBuilder()
                  .setStatus(status.toProto())
                  .setRetMsg(msg)
                  .build();
        } finally {
          if (sdr != null) {
            sdr.release();
            ShuffleServerMetrics.gaugeReadLocalDataFileThreadNum.dec();
            ShuffleServerMetrics.gaugeReadLocalDataFileBufferSize.dec(length);
          }
          shuffleServer.getShuffleBufferManager().releaseReadMemory(length);
        }
      } else {
        status = StatusCode.NO_BUFFER;
        msg = "Can't require memory to get shuffle data";
        LOG.warn("{} for {}", msg, requestInfo);
        reply =
            GetLocalShuffleDataResponse.newBuilder()
                .setStatus(status.toProto())
                .setRetMsg(msg)
                .build();
      }
      auditContext.withStatusCode(status);
      auditContext.withReturnValue("len=" + (sdr == null ? 0 : sdr.getDataLength()));
      responseObserver.onNext(reply);
      responseObserver.onCompleted();
    }
  }