public void handleGetSortedShuffleDataRequest()

in server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java [774:875]


  public void handleGetSortedShuffleDataRequest(
      TransportClient client, GetSortedShuffleDataRequest req) {
    final long start = System.currentTimeMillis();
    long requestId = req.getRequestId();
    String appId = req.getAppId();
    int shuffleId = req.getShuffleId();
    int partitionId = req.getPartitionId();
    long blockId = req.getBlockId();
    long timestamp = req.getTimestamp();

    if (timestamp > 0) {
      long transportTime = start - timestamp;
      if (transportTime > 0) {
        shuffleServer
            .getNettyMetrics()
            .recordTransportTime(GetSortedShuffleDataRequest.class.getName(), transportTime);
      }
    }
    StatusCode status = StatusCode.SUCCESS;
    String msg = "OK";
    GetSortedShuffleDataResponse response;
    String requestInfo =
        "appId["
            + appId
            + "], shuffleId["
            + shuffleId
            + "], partitionId["
            + partitionId
            + "], blockId["
            + blockId
            + "]";
    if (!shuffleServer.isRemoteMergeEnable()) {
      msg = "Remote merge is disabled";
      status = StatusCode.INTERNAL_ERROR;
      response =
          new GetSortedShuffleDataResponse(
              requestId, status, msg, -1, MergeState.INTERNAL_ERROR.code(), Unpooled.EMPTY_BUFFER);
      client.getChannel().writeAndFlush(response);
      return;
    }
    MergeStatus mergeStatus =
        shuffleServer.getShuffleMergeManager().tryGetBlock(appId, shuffleId, partitionId, blockId);
    MergeState mergeState = mergeStatus.getState();
    long readBlockSize = mergeStatus.getSize();

    if (mergeState == MergeState.INITED
        || (mergeState == MergeState.MERGING && readBlockSize == -1)
        || (mergeState == MergeState.DONE && readBlockSize == -1)
        || mergeState == MergeState.INTERNAL_ERROR) {
      msg = mergeState.name();
      response =
          new GetSortedShuffleDataResponse(
              requestId, status, msg, -1, mergeState.code(), Unpooled.EMPTY_BUFFER);
      client.getChannel().writeAndFlush(response);
      return;
    }

    if (shuffleServer.getShuffleBufferManager().requireReadMemory(readBlockSize)) {
      ShuffleDataResult sdr = null;
      try {
        sdr =
            shuffleServer
                .getShuffleMergeManager()
                .getShuffleData(appId, shuffleId, partitionId, blockId);

        response =
            new GetSortedShuffleDataResponse(
                requestId, status, msg, blockId + 1, mergeState.code(), sdr.getManagedBuffer());

        ReleaseMemoryAndRecordReadTimeListener listener =
            new ReleaseMemoryAndRecordReadTimeListener(
                start, readBlockSize, sdr.getDataLength(), requestInfo, req, response, client);

        client.getChannel().writeAndFlush(response).addListener(listener);
      } catch (Exception e) {
        shuffleServer.getShuffleBufferManager().releaseReadMemory(readBlockSize);
        if (sdr != null) {
          sdr.release();
        }
        status = StatusCode.INTERNAL_ERROR;
        msg = "Error happened when get shuffle data for " + requestInfo + ", " + e.getMessage();
        LOG.error(msg, e);
        response =
            new GetSortedShuffleDataResponse(
                requestId,
                status,
                msg,
                -1,
                MergeState.INTERNAL_ERROR.code(),
                Unpooled.EMPTY_BUFFER);
        client.getChannel().writeAndFlush(response);
      }
    } else {
      status = StatusCode.NO_BUFFER;
      msg = "Can't require read memory to get sorted shuffle data";
      LOG.error(msg + " for " + requestInfo);
      response =
          new GetSortedShuffleDataResponse(
              requestId, status, msg, -1, mergeState.code(), Unpooled.EMPTY_BUFFER);
      client.getChannel().writeAndFlush(response);
    }
  }