public void getMemoryShuffleData()

in server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java [1396:1528]


  public void getMemoryShuffleData(
      GetMemoryShuffleDataRequest request,
      StreamObserver<GetMemoryShuffleDataResponse> responseObserver) {
    try (ServerRpcAuditContext auditContext = createAuditContext("getMemoryShuffleData")) {
      String appId = request.getAppId();
      int shuffleId = request.getShuffleId();
      int partitionId = request.getPartitionId();
      long blockId = request.getLastBlockId();
      int readBufferSize = request.getReadBufferSize();

      auditContext.withAppId(appId).withShuffleId(shuffleId);
      auditContext.withArgs(
          "partitionId="
              + partitionId
              + ", blockId="
              + blockId
              + ", readBufferSize="
              + readBufferSize);

      StatusCode status = verifyRequest(appId);
      if (status != StatusCode.SUCCESS) {
        auditContext.withStatusCode(status);
        GetMemoryShuffleDataResponse reply =
            GetMemoryShuffleDataResponse.newBuilder()
                .setStatus(status.toProto())
                .setRetMsg(status.toString())
                .build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
        return;
      }

      long timestamp = request.getTimestamp();
      if (timestamp > 0) {
        long transportTime = System.currentTimeMillis() - timestamp;
        if (transportTime > 0) {
          shuffleServer
              .getGrpcMetrics()
              .recordTransportTime(
                  ShuffleServerGrpcMetrics.GET_MEMORY_SHUFFLE_DATA_METHOD, transportTime);
        }
      }
      String msg = "OK";
      GetMemoryShuffleDataResponse reply;
      String requestInfo =
          "appId[" + appId + "], shuffleId[" + shuffleId + "], partitionId[" + partitionId + "]";

      // todo: if can get the exact memory size?
      if (shuffleServer.getShuffleBufferManager().requireReadMemory(readBufferSize)) {
        ShuffleDataResult shuffleDataResult = null;
        try {
          final long start = System.currentTimeMillis();
          Roaring64NavigableMap expectedTaskIds = null;
          if (request.getSerializedExpectedTaskIdsBitmap() != null
              && !request.getSerializedExpectedTaskIdsBitmap().isEmpty()) {
            expectedTaskIds =
                RssUtils.deserializeBitMap(
                    request.getSerializedExpectedTaskIdsBitmap().toByteArray());
          }
          shuffleDataResult =
              shuffleServer
                  .getShuffleTaskManager()
                  .getInMemoryShuffleData(
                      appId, shuffleId, partitionId, blockId, readBufferSize, expectedTaskIds);
          byte[] data = new byte[] {};
          List<BufferSegment> bufferSegments = Lists.newArrayList();
          if (shuffleDataResult != null) {
            data = shuffleDataResult.getData();
            bufferSegments = shuffleDataResult.getBufferSegments();
            ShuffleServerMetrics.counterTotalReadDataSize.inc(data.length);
            ShuffleServerMetrics.counterTotalReadMemoryDataSize.inc(data.length);
            ShuffleServerMetrics.gaugeReadMemoryDataThreadNum.inc();
            ShuffleServerMetrics.gaugeReadMemoryDataBufferSize.inc(readBufferSize);
          }
          long costTime = System.currentTimeMillis() - start;
          shuffleServer
              .getGrpcMetrics()
              .recordProcessTime(ShuffleServerGrpcMetrics.GET_MEMORY_SHUFFLE_DATA_METHOD, costTime);
          LOG.info(
              "Successfully getInMemoryShuffleData cost {} ms with {} bytes shuffle data for {}",
              costTime,
              data.length,
              requestInfo);
          auditContext.withReturnValue(
              "len=" + data.length + ", bufferSegmentSize=" + bufferSegments.size());
          reply =
              GetMemoryShuffleDataResponse.newBuilder()
                  .setStatus(status.toProto())
                  .setRetMsg(msg)
                  .setData(UnsafeByteOperations.unsafeWrap(data))
                  .addAllShuffleDataBlockSegments(toShuffleDataBlockSegments(bufferSegments))
                  .build();
        } catch (Exception e) {
          status = StatusCode.INTERNAL_ERROR;
          msg =
              "Error happened when get in memory shuffle data for "
                  + requestInfo
                  + ", "
                  + e.getMessage();
          LOG.error(msg, e);
          reply =
              GetMemoryShuffleDataResponse.newBuilder()
                  .setData(UnsafeByteOperations.unsafeWrap(new byte[] {}))
                  .addAllShuffleDataBlockSegments(Lists.newArrayList())
                  .setStatus(status.toProto())
                  .setRetMsg(msg)
                  .build();
        } finally {
          if (shuffleDataResult != null) {
            shuffleDataResult.release();
            ShuffleServerMetrics.gaugeReadMemoryDataThreadNum.dec();
            ShuffleServerMetrics.gaugeReadMemoryDataBufferSize.dec(readBufferSize);
          }
          shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
        }
      } else {
        status = StatusCode.NO_BUFFER;
        msg = "Can't require memory to get in memory shuffle data";
        LOG.warn("{} for {}", msg, requestInfo);
        reply =
            GetMemoryShuffleDataResponse.newBuilder()
                .setData(UnsafeByteOperations.unsafeWrap(new byte[] {}))
                .addAllShuffleDataBlockSegments(Lists.newArrayList())
                .setStatus(status.toProto())
                .setRetMsg(msg)
                .build();
      }

      auditContext.withStatusCode(status);
      responseObserver.onNext(reply);
      responseObserver.onCompleted();
    }
  }