public void getLocalShuffleData()

in server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java [569:675]


  public void getLocalShuffleData(
      GetLocalShuffleDataRequest request,
      StreamObserver<GetLocalShuffleDataResponse> responseObserver) {
    String appId = request.getAppId();
    int shuffleId = request.getShuffleId();
    int partitionId = request.getPartitionId();
    int partitionNumPerRange = request.getPartitionNumPerRange();
    int partitionNum = request.getPartitionNum();
    long offset = request.getOffset();
    int length = request.getLength();
    long timestamp = request.getTimestamp();
    if (timestamp > 0) {
      long transportTime = System.currentTimeMillis() - timestamp;
      if (transportTime > 0) {
        shuffleServer
            .getGrpcMetrics()
            .recordTransportTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, transportTime);
      }
    }
    String storageType =
        shuffleServer.getShuffleServerConf().get(RssBaseConf.RSS_STORAGE_TYPE).name();
    StatusCode status = StatusCode.SUCCESS;
    String msg = "OK";
    GetLocalShuffleDataResponse reply = null;
    ShuffleDataResult sdr;
    String requestInfo =
        "appId["
            + appId
            + "], shuffleId["
            + shuffleId
            + "], partitionId["
            + partitionId
            + "]"
            + "offset["
            + offset
            + "]"
            + "length["
            + length
            + "]";

    int[] range =
        ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum);
    Storage storage =
        shuffleServer
            .getStorageManager()
            .selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0]));
    if (storage != null) {
      storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId));
    }

    if (shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(length)) {
      try {
        long start = System.currentTimeMillis();
        sdr =
            shuffleServer
                .getShuffleTaskManager()
                .getShuffleData(
                    appId,
                    shuffleId,
                    partitionId,
                    partitionNumPerRange,
                    partitionNum,
                    storageType,
                    offset,
                    length);
        long readTime = System.currentTimeMillis() - start;
        ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
        ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getData().length);
        ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getData().length);
        shuffleServer
            .getGrpcMetrics()
            .recordProcessTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, readTime);
        LOG.info(
            "Successfully getShuffleData cost {} ms for shuffle" + " data with {}",
            readTime,
            requestInfo);
        reply =
            GetLocalShuffleDataResponse.newBuilder()
                .setStatus(status.toProto())
                .setRetMsg(msg)
                .setData(UnsafeByteOperations.unsafeWrap(sdr.getData()))
                .build();
      } catch (Exception e) {
        status = StatusCode.INTERNAL_ERROR;
        msg = "Error happened when get shuffle data for " + requestInfo + ", " + e.getMessage();
        LOG.error(msg, e);
        reply =
            GetLocalShuffleDataResponse.newBuilder()
                .setStatus(status.toProto())
                .setRetMsg(msg)
                .build();
      } finally {
        shuffleServer.getShuffleBufferManager().releaseReadMemory(length);
      }
    } else {
      status = StatusCode.INTERNAL_ERROR;
      msg = "Can't require memory to get shuffle data";
      LOG.error(msg + " for " + requestInfo);
      reply =
          GetLocalShuffleDataResponse.newBuilder()
              .setStatus(status.toProto())
              .setRetMsg(msg)
              .build();
    }
    responseObserver.onNext(reply);
    responseObserver.onCompleted();
  }