public void handleGetLocalShuffleData()

in server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java [641:772]


  public void handleGetLocalShuffleData(TransportClient client, GetLocalShuffleDataRequest req) {
    try (ServerRpcAuditContext auditContext = createAuditContext("getLocalShuffleData", client)) {
      GetLocalShuffleDataResponse response;
      String appId = req.getAppId();
      int shuffleId = req.getShuffleId();
      int partitionId = req.getPartitionId();
      int partitionNumPerRange = req.getPartitionNumPerRange();
      int partitionNum = req.getPartitionNum();
      long offset = req.getOffset();
      int length = req.getLength();
      int storageId = req.getStorageId();
      auditContext.withAppId(appId);
      auditContext.withShuffleId(shuffleId);
      auditContext.withArgs(
          "requestId="
              + req.getRequestId()
              + ", partitionId="
              + partitionId
              + ", partitionNumPerRange="
              + partitionNumPerRange
              + ", partitionNum="
              + partitionNum
              + ", offset="
              + offset
              + ", length="
              + length
              + ", storageId="
              + storageId);
      StatusCode status = verifyRequest(appId);
      if (status != StatusCode.SUCCESS) {
        auditContext.withStatusCode(status);
        response =
            new GetLocalShuffleDataResponse(
                req.getRequestId(),
                status,
                status.toString(),
                new NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
        client.getChannel().writeAndFlush(response);
        return;
      }
      long timestamp = req.getTimestamp();
      if (timestamp > 0) {
        long transportTime = System.currentTimeMillis() - timestamp;
        if (transportTime > 0) {
          shuffleServer
              .getNettyMetrics()
              .recordTransportTime(GetLocalShuffleDataRequest.class.getName(), transportTime);
        }
      }
      String storageType =
          shuffleServer.getShuffleServerConf().get(RssBaseConf.RSS_STORAGE_TYPE).name();
      String msg = "OK";
      String requestInfo =
          "appId["
              + appId
              + "], shuffleId["
              + shuffleId
              + "], partitionId["
              + partitionId
              + "], offset["
              + offset
              + "], length["
              + length
              + "]";

      int[] range =
          ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum);
      Storage storage =
          shuffleServer
              .getStorageManager()
              .selectStorage(
                  new ShuffleDataReadEvent(
                      appId, shuffleId, partitionId, range[0], req.getStorageId()));
      if (storage != null) {
        storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId));
      }

      if (shuffleServer.getShuffleBufferManager().requireReadMemory(length)) {
        ShuffleDataResult sdr = null;
        try {
          final long start = System.currentTimeMillis();
          sdr =
              shuffleServer
                  .getShuffleTaskManager()
                  .getShuffleData(
                      appId,
                      shuffleId,
                      partitionId,
                      partitionNumPerRange,
                      partitionNum,
                      storageType,
                      offset,
                      length,
                      storageId);
          ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getDataLength());
          ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getDataLength());
          ShuffleServerMetrics.gaugeReadLocalDataFileThreadNum.inc();
          ShuffleServerMetrics.gaugeReadLocalDataFileBufferSize.inc(length);
          response =
              new GetLocalShuffleDataResponse(
                  req.getRequestId(), status, msg, sdr.getManagedBuffer());
          ReleaseMemoryAndRecordReadTimeListener listener =
              new ReleaseMemoryAndRecordReadTimeListener(
                  start, length, sdr.getDataLength(), requestInfo, req, response, client);
          client.getChannel().writeAndFlush(response).addListener(listener);
          auditContext.withStatusCode(response.getStatusCode());
          auditContext.withReturnValue("len=" + sdr.getDataLength());
          return;
        } catch (Exception e) {
          shuffleServer.getShuffleBufferManager().releaseReadMemory(length);
          if (sdr != null) {
            sdr.release();
          }
          status = StatusCode.INTERNAL_ERROR;
          msg = "Error happened when get shuffle data for " + requestInfo + ", " + e.getMessage();
          LOG.error(msg, e);
          response =
              new GetLocalShuffleDataResponse(
                  req.getRequestId(), status, msg, new NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
        }
      } else {
        status = StatusCode.NO_BUFFER;
        msg = "Can't require memory to get shuffle data";
        LOG.warn("{} for {}", msg, requestInfo);
        response =
            new GetLocalShuffleDataResponse(
                req.getRequestId(), status, msg, new NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
      }
      auditContext.withStatusCode(response.getStatusCode());
      client.getChannel().writeAndFlush(response);
    }
  }