public void getMemoryShuffleData()

in server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java [764:863]


  public void getMemoryShuffleData(
      GetMemoryShuffleDataRequest request,
      StreamObserver<GetMemoryShuffleDataResponse> responseObserver) {
    String appId = request.getAppId();
    int shuffleId = request.getShuffleId();
    int partitionId = request.getPartitionId();
    long blockId = request.getLastBlockId();
    int readBufferSize = request.getReadBufferSize();
    long timestamp = request.getTimestamp();

    if (timestamp > 0) {
      long transportTime = System.currentTimeMillis() - timestamp;
      if (transportTime > 0) {
        shuffleServer
            .getGrpcMetrics()
            .recordTransportTime(
                ShuffleServerGrpcMetrics.GET_MEMORY_SHUFFLE_DATA_METHOD, transportTime);
      }
    }
    long start = System.currentTimeMillis();
    StatusCode status = StatusCode.SUCCESS;
    String msg = "OK";
    GetMemoryShuffleDataResponse reply;
    String requestInfo =
        "appId[" + appId + "], shuffleId[" + shuffleId + "], partitionId[" + partitionId + "]";

    // todo: if can get the exact memory size?
    if (shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(readBufferSize)) {
      try {
        Roaring64NavigableMap expectedTaskIds = null;
        if (request.getSerializedExpectedTaskIdsBitmap() != null
            && !request.getSerializedExpectedTaskIdsBitmap().isEmpty()) {
          expectedTaskIds =
              RssUtils.deserializeBitMap(
                  request.getSerializedExpectedTaskIdsBitmap().toByteArray());
        }
        ShuffleDataResult shuffleDataResult =
            shuffleServer
                .getShuffleTaskManager()
                .getInMemoryShuffleData(
                    appId, shuffleId, partitionId, blockId, readBufferSize, expectedTaskIds);
        byte[] data = new byte[] {};
        List<BufferSegment> bufferSegments = Lists.newArrayList();
        if (shuffleDataResult != null) {
          data = shuffleDataResult.getData();
          bufferSegments = shuffleDataResult.getBufferSegments();
          ShuffleServerMetrics.counterTotalReadDataSize.inc(data.length);
          ShuffleServerMetrics.counterTotalReadMemoryDataSize.inc(data.length);
        }
        long costTime = System.currentTimeMillis() - start;
        shuffleServer
            .getGrpcMetrics()
            .recordProcessTime(ShuffleServerGrpcMetrics.GET_MEMORY_SHUFFLE_DATA_METHOD, costTime);
        LOG.info(
            "Successfully getInMemoryShuffleData cost {} ms with {} bytes shuffle" + " data for {}",
            costTime,
            data.length,
            requestInfo);

        reply =
            GetMemoryShuffleDataResponse.newBuilder()
                .setStatus(status.toProto())
                .setRetMsg(msg)
                .setData(UnsafeByteOperations.unsafeWrap(data))
                .addAllShuffleDataBlockSegments(toShuffleDataBlockSegments(bufferSegments))
                .build();
      } catch (Exception e) {
        status = StatusCode.INTERNAL_ERROR;
        msg =
            "Error happened when get in memory shuffle data for "
                + requestInfo
                + ", "
                + e.getMessage();
        LOG.error(msg, e);
        reply =
            GetMemoryShuffleDataResponse.newBuilder()
                .setData(UnsafeByteOperations.unsafeWrap(new byte[] {}))
                .addAllShuffleDataBlockSegments(Lists.newArrayList())
                .setStatus(status.toProto())
                .setRetMsg(msg)
                .build();
      } finally {
        shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
      }
    } else {
      status = StatusCode.INTERNAL_ERROR;
      msg = "Can't require memory to get in memory shuffle data";
      LOG.error(msg + " for " + requestInfo);
      reply =
          GetMemoryShuffleDataResponse.newBuilder()
              .setData(UnsafeByteOperations.unsafeWrap(new byte[] {}))
              .addAllShuffleDataBlockSegments(Lists.newArrayList())
              .setStatus(status.toProto())
              .setRetMsg(msg)
              .build();
    }

    responseObserver.onNext(reply);
    responseObserver.onCompleted();
  }