public void getLocalShuffleIndex()

in server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java [678:761]


  public void getLocalShuffleIndex(
      GetLocalShuffleIndexRequest request,
      StreamObserver<GetLocalShuffleIndexResponse> responseObserver) {
    String appId = request.getAppId();
    int shuffleId = request.getShuffleId();
    int partitionId = request.getPartitionId();
    int partitionNumPerRange = request.getPartitionNumPerRange();
    int partitionNum = request.getPartitionNum();
    StatusCode status = StatusCode.SUCCESS;
    String msg = "OK";
    GetLocalShuffleIndexResponse reply;
    String requestInfo =
        "appId[" + appId + "], shuffleId[" + shuffleId + "], partitionId[" + partitionId + "]";

    int[] range =
        ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum);
    Storage storage =
        shuffleServer
            .getStorageManager()
            .selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0]));
    if (storage != null) {
      storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId));
    }
    // Index file is expected small size and won't cause oom problem with the assumed size. An index
    // segment is 40B,
    // with the default size - 2MB, it can support 50k blocks for shuffle data.
    long assumedFileSize =
        shuffleServer
            .getShuffleServerConf()
            .getLong(ShuffleServerConf.SERVER_SHUFFLE_INDEX_SIZE_HINT);
    if (shuffleServer.getShuffleBufferManager().requireReadMemoryWithRetry(assumedFileSize)) {
      try {
        long start = System.currentTimeMillis();
        ShuffleIndexResult shuffleIndexResult =
            shuffleServer
                .getShuffleTaskManager()
                .getShuffleIndex(appId, shuffleId, partitionId, partitionNumPerRange, partitionNum);
        long readTime = System.currentTimeMillis() - start;

        ByteBuffer data = shuffleIndexResult.getIndexData();
        ShuffleServerMetrics.counterTotalReadDataSize.inc(data.remaining());
        ShuffleServerMetrics.counterTotalReadLocalIndexFileSize.inc(data.remaining());
        GetLocalShuffleIndexResponse.Builder builder =
            GetLocalShuffleIndexResponse.newBuilder().setStatus(status.toProto()).setRetMsg(msg);
        LOG.info(
            "Successfully getShuffleIndex cost {} ms for {}" + " bytes with {}",
            readTime,
            data.remaining(),
            requestInfo);

        builder.setIndexData(UnsafeByteOperations.unsafeWrap(data));
        builder.setDataFileLen(shuffleIndexResult.getDataFileLen());
        reply = builder.build();
      } catch (FileNotFoundException indexFileNotFoundException) {
        LOG.warn(
            "Index file for {} is not found, maybe the data has been flushed to cold storage.",
            requestInfo,
            indexFileNotFoundException);
        reply = GetLocalShuffleIndexResponse.newBuilder().setStatus(status.toProto()).build();
      } catch (Exception e) {
        status = StatusCode.INTERNAL_ERROR;
        msg = "Error happened when get shuffle index for " + requestInfo + ", " + e.getMessage();
        LOG.error(msg, e);
        reply =
            GetLocalShuffleIndexResponse.newBuilder()
                .setStatus(status.toProto())
                .setRetMsg(msg)
                .build();
      } finally {
        shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize);
      }
    } else {
      status = StatusCode.INTERNAL_ERROR;
      msg = "Can't require memory to get shuffle index";
      LOG.error(msg + " for " + requestInfo);
      reply =
          GetLocalShuffleIndexResponse.newBuilder()
              .setStatus(status.toProto())
              .setRetMsg(msg)
              .build();
    }
    responseObserver.onNext(reply);
    responseObserver.onCompleted();
  }