public void handleGetMemoryShuffleDataRequest()

in server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java [413:522]


  public void handleGetMemoryShuffleDataRequest(
      TransportClient client, GetMemoryShuffleDataRequest req) {
    try (ServerRpcAuditContext auditContext = createAuditContext("getMemoryShuffleData", client)) {
      String appId = req.getAppId();
      int shuffleId = req.getShuffleId();
      int partitionId = req.getPartitionId();
      long blockId = req.getLastBlockId();
      int readBufferSize = req.getReadBufferSize();
      auditContext.withAppId(appId).withShuffleId(shuffleId);
      auditContext.withArgs(
          "requestId="
              + req.getRequestId()
              + ", partitionId="
              + partitionId
              + ", blockId="
              + blockId
              + ", readBufferSize="
              + readBufferSize);
      StatusCode status = verifyRequest(appId);
      if (status != StatusCode.SUCCESS) {
        auditContext.withStatusCode(status);
        GetMemoryShuffleDataResponse response =
            new GetMemoryShuffleDataResponse(
                req.getRequestId(),
                status,
                status.toString(),
                Lists.newArrayList(),
                Unpooled.EMPTY_BUFFER);
        client.getChannel().writeAndFlush(response);
        return;
      }

      long timestamp = req.getTimestamp();
      if (timestamp > 0) {
        long transportTime = System.currentTimeMillis() - timestamp;
        if (transportTime > 0) {
          shuffleServer
              .getNettyMetrics()
              .recordTransportTime(GetMemoryShuffleDataRequest.class.getName(), transportTime);
        }
      }
      String msg = "OK";
      GetMemoryShuffleDataResponse response;
      String requestInfo =
          "appId[" + appId + "], shuffleId[" + shuffleId + "], partitionId[" + partitionId + "]";

      // todo: if can get the exact memory size?
      if (shuffleServer.getShuffleBufferManager().requireReadMemory(readBufferSize)) {
        ShuffleDataResult shuffleDataResult = null;
        try {
          final long start = System.currentTimeMillis();
          shuffleDataResult =
              shuffleServer
                  .getShuffleTaskManager()
                  .getInMemoryShuffleData(
                      appId,
                      shuffleId,
                      partitionId,
                      blockId,
                      readBufferSize,
                      req.getExpectedTaskIdsBitmap());
          ManagedBuffer data = NettyManagedBuffer.EMPTY_BUFFER;
          List<BufferSegment> bufferSegments = Lists.newArrayList();
          if (shuffleDataResult != null) {
            data = shuffleDataResult.getManagedBuffer();
            bufferSegments = shuffleDataResult.getBufferSegments();
            ShuffleServerMetrics.counterTotalReadDataSize.inc(data.size());
            ShuffleServerMetrics.counterTotalReadMemoryDataSize.inc(data.size());
            ShuffleServerMetrics.gaugeReadMemoryDataThreadNum.inc();
            ShuffleServerMetrics.gaugeReadMemoryDataBufferSize.inc(readBufferSize);
          }
          auditContext.withStatusCode(status);
          auditContext.withReturnValue(
              "len=" + data.size() + ", bufferSegments=" + bufferSegments.size());
          response =
              new GetMemoryShuffleDataResponse(
                  req.getRequestId(), status, msg, bufferSegments, data);
          ReleaseMemoryAndRecordReadTimeListener listener =
              new ReleaseMemoryAndRecordReadTimeListener(
                  start, readBufferSize, data.size(), requestInfo, req, response, client);
          client.getChannel().writeAndFlush(response).addListener(listener);
          return;
        } catch (Exception e) {
          shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
          if (shuffleDataResult != null) {
            shuffleDataResult.release();
          }
          status = StatusCode.INTERNAL_ERROR;
          msg =
              "Error happened when get in memory shuffle data for "
                  + requestInfo
                  + ", "
                  + e.getMessage();
          LOG.error(msg, e);
          response =
              new GetMemoryShuffleDataResponse(
                  req.getRequestId(), status, msg, Lists.newArrayList(), Unpooled.EMPTY_BUFFER);
        }
      } else {
        status = StatusCode.NO_BUFFER;
        msg = "Can't require memory to get in memory shuffle data";
        LOG.warn("{} for {}", msg, requestInfo);
        response =
            new GetMemoryShuffleDataResponse(
                req.getRequestId(), status, msg, Lists.newArrayList(), Unpooled.EMPTY_BUFFER);
      }
      auditContext.withStatusCode(response.getStatusCode());
      client.getChannel().writeAndFlush(response);
    }
  }