public void handleGetMemoryShuffleDataRequest()

in server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java [216:296]


  public void handleGetMemoryShuffleDataRequest(
      TransportClient client, GetMemoryShuffleDataRequest req) {
    String appId = req.getAppId();
    int shuffleId = req.getShuffleId();
    int partitionId = req.getPartitionId();
    long blockId = req.getLastBlockId();
    int readBufferSize = req.getReadBufferSize();
    long timestamp = req.getTimestamp();

    if (timestamp > 0) {
      long transportTime = System.currentTimeMillis() - timestamp;
      if (transportTime > 0) {
        shuffleServer
            .getNettyMetrics()
            .recordTransportTime(GetMemoryShuffleDataRequest.class.getName(), transportTime);
      }
    }
    long start = System.currentTimeMillis();
    StatusCode status = StatusCode.SUCCESS;
    String msg = "OK";
    GetMemoryShuffleDataResponse response;
    String requestInfo =
        "appId[" + appId + "], shuffleId[" + shuffleId + "], partitionId[" + partitionId + "]";

    // todo: if can get the exact memory size?
    if (shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(readBufferSize)) {
      try {
        ShuffleDataResult shuffleDataResult =
            shuffleServer
                .getShuffleTaskManager()
                .getInMemoryShuffleData(
                    appId,
                    shuffleId,
                    partitionId,
                    blockId,
                    readBufferSize,
                    req.getExpectedTaskIdsBitmap());
        ByteBuf data = Unpooled.EMPTY_BUFFER;
        List<BufferSegment> bufferSegments = Lists.newArrayList();
        if (shuffleDataResult != null) {
          data = Unpooled.wrappedBuffer(shuffleDataResult.getDataBuffer());
          bufferSegments = shuffleDataResult.getBufferSegments();
          ShuffleServerMetrics.counterTotalReadDataSize.inc(data.readableBytes());
          ShuffleServerMetrics.counterTotalReadMemoryDataSize.inc(data.readableBytes());
        }
        long costTime = System.currentTimeMillis() - start;
        shuffleServer
            .getNettyMetrics()
            .recordProcessTime(GetMemoryShuffleDataRequest.class.getName(), costTime);
        LOG.info(
            "Successfully getInMemoryShuffleData cost {} ms with {} bytes shuffle" + " data for {}",
            costTime,
            data.readableBytes(),
            requestInfo);

        response =
            new GetMemoryShuffleDataResponse(req.getRequestId(), status, msg, bufferSegments, data);
      } catch (Exception e) {
        status = StatusCode.INTERNAL_ERROR;
        msg =
            "Error happened when get in memory shuffle data for "
                + requestInfo
                + ", "
                + e.getMessage();
        LOG.error(msg, e);
        response =
            new GetMemoryShuffleDataResponse(
                req.getRequestId(), status, msg, Lists.newArrayList(), Unpooled.EMPTY_BUFFER);
      } finally {
        shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
      }
    } else {
      status = StatusCode.INTERNAL_ERROR;
      msg = "Can't require memory to get in memory shuffle data";
      LOG.error(msg + " for " + requestInfo);
      response =
          new GetMemoryShuffleDataResponse(
              req.getRequestId(), status, msg, Lists.newArrayList(), Unpooled.EMPTY_BUFFER);
    }
    client.getChannel().writeAndFlush(response);
  }