public void handleGetLocalShuffleData()

in server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java [380:476]


  public void handleGetLocalShuffleData(TransportClient client, GetLocalShuffleDataRequest req) {
    String appId = req.getAppId();
    int shuffleId = req.getShuffleId();
    int partitionId = req.getPartitionId();
    int partitionNumPerRange = req.getPartitionNumPerRange();
    int partitionNum = req.getPartitionNum();
    long offset = req.getOffset();
    int length = req.getLength();
    long timestamp = req.getTimestamp();
    if (timestamp > 0) {
      long transportTime = System.currentTimeMillis() - timestamp;
      if (transportTime > 0) {
        shuffleServer
            .getNettyMetrics()
            .recordTransportTime(GetLocalShuffleDataRequest.class.getName(), transportTime);
      }
    }
    String storageType =
        shuffleServer.getShuffleServerConf().get(RssBaseConf.RSS_STORAGE_TYPE).name();
    StatusCode status = StatusCode.SUCCESS;
    String msg = "OK";
    GetLocalShuffleDataResponse response;
    ShuffleDataResult sdr;
    String requestInfo =
        "appId["
            + appId
            + "], shuffleId["
            + shuffleId
            + "], partitionId["
            + partitionId
            + "]"
            + "offset["
            + offset
            + "]"
            + "length["
            + length
            + "]";

    int[] range =
        ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum);
    Storage storage =
        shuffleServer
            .getStorageManager()
            .selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0]));
    if (storage != null) {
      storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId));
    }

    if (shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(length)) {
      try {
        long start = System.currentTimeMillis();
        sdr =
            shuffleServer
                .getShuffleTaskManager()
                .getShuffleData(
                    appId,
                    shuffleId,
                    partitionId,
                    partitionNumPerRange,
                    partitionNum,
                    storageType,
                    offset,
                    length);
        long readTime = System.currentTimeMillis() - start;
        ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
        ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getData().length);
        ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getData().length);
        shuffleServer
            .getNettyMetrics()
            .recordProcessTime(GetLocalShuffleDataRequest.class.getName(), readTime);
        LOG.info(
            "Successfully getShuffleData cost {} ms for shuffle" + " data with {}",
            readTime,
            requestInfo);
        response =
            new GetLocalShuffleDataResponse(
                req.getRequestId(), status, msg, sdr.getManagedBuffer());
      } catch (Exception e) {
        status = StatusCode.INTERNAL_ERROR;
        msg = "Error happened when get shuffle data for " + requestInfo + ", " + e.getMessage();
        LOG.error(msg, e);
        response =
            new GetLocalShuffleDataResponse(
                req.getRequestId(), status, msg, new NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
      } finally {
        shuffleServer.getShuffleBufferManager().releaseReadMemory(length);
      }
    } else {
      status = StatusCode.INTERNAL_ERROR;
      msg = "Can't require memory to get shuffle data";
      LOG.error(msg + " for " + requestInfo);
      response =
          new GetLocalShuffleDataResponse(
              req.getRequestId(), status, msg, new NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
    }
    client.getChannel().writeAndFlush(response);
  }