public void handleGetLocalShuffleIndexRequest()

in server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java [298:378]


  public void handleGetLocalShuffleIndexRequest(
      TransportClient client, GetLocalShuffleIndexRequest req) {
    String appId = req.getAppId();
    int shuffleId = req.getShuffleId();
    int partitionId = req.getPartitionId();
    int partitionNumPerRange = req.getPartitionNumPerRange();
    int partitionNum = req.getPartitionNum();
    StatusCode status = StatusCode.SUCCESS;
    String msg = "OK";
    GetLocalShuffleIndexResponse response;
    String requestInfo =
        "appId[" + appId + "], shuffleId[" + shuffleId + "], partitionId[" + partitionId + "]";

    int[] range =
        ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum);
    Storage storage =
        shuffleServer
            .getStorageManager()
            .selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0]));
    if (storage != null) {
      storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId));
    }
    // Index file is expected small size and won't cause oom problem with the assumed size. An index
    // segment is 40B,
    // with the default size - 2MB, it can support 50k blocks for shuffle data.
    long assumedFileSize =
        shuffleServer
            .getShuffleServerConf()
            .getLong(ShuffleServerConf.SERVER_SHUFFLE_INDEX_SIZE_HINT);
    if (shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(assumedFileSize)) {
      try {
        final long start = System.currentTimeMillis();
        ShuffleIndexResult shuffleIndexResult =
            shuffleServer
                .getShuffleTaskManager()
                .getShuffleIndex(appId, shuffleId, partitionId, partitionNumPerRange, partitionNum);

        ByteBuffer data = shuffleIndexResult.getIndexData();
        ShuffleServerMetrics.counterTotalReadDataSize.inc(data.remaining());
        ShuffleServerMetrics.counterTotalReadLocalIndexFileSize.inc(data.remaining());
        response =
            new GetLocalShuffleIndexResponse(
                req.getRequestId(),
                status,
                msg,
                Unpooled.wrappedBuffer(data),
                shuffleIndexResult.getDataFileLen());
        long readTime = System.currentTimeMillis() - start;
        LOG.info(
            "Successfully getShuffleIndex cost {} ms for {}" + " bytes with {}",
            readTime,
            data.remaining(),
            requestInfo);
      } catch (FileNotFoundException indexFileNotFoundException) {
        LOG.warn(
            "Index file for {} is not found, maybe the data has been flushed to cold storage.",
            requestInfo,
            indexFileNotFoundException);
        response =
            new GetLocalShuffleIndexResponse(
                req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L);
      } catch (Exception e) {
        status = StatusCode.INTERNAL_ERROR;
        msg = "Error happened when get shuffle index for " + requestInfo + ", " + e.getMessage();
        LOG.error(msg, e);
        response =
            new GetLocalShuffleIndexResponse(
                req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L);
      } finally {
        shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize);
      }
    } else {
      status = StatusCode.INTERNAL_ERROR;
      msg = "Can't require memory to get shuffle index";
      LOG.error(msg + " for " + requestInfo);
      response =
          new GetLocalShuffleIndexResponse(
              req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L);
    }
    client.getChannel().writeAndFlush(response);
  }