public void getLocalShuffleIndex()

in server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java [1268:1393]


  public void getLocalShuffleIndex(
      GetLocalShuffleIndexRequest request,
      StreamObserver<GetLocalShuffleIndexResponse> responseObserver) {
    try (ServerRpcAuditContext auditContext = createAuditContext("getLocalShuffleIndex")) {
      String appId = request.getAppId();
      int shuffleId = request.getShuffleId();
      int partitionId = request.getPartitionId();
      int partitionNumPerRange = request.getPartitionNumPerRange();
      int partitionNum = request.getPartitionNum();
      auditContext.withAppId(appId).withShuffleId(shuffleId);
      auditContext.withArgs(
          "partitionId="
              + partitionId
              + ", partitionNumPerRange="
              + partitionNumPerRange
              + ", partitionNum="
              + partitionNum);

      StatusCode status = verifyRequest(appId);
      if (status != StatusCode.SUCCESS) {
        auditContext.withStatusCode(status);
        GetLocalShuffleIndexResponse reply =
            GetLocalShuffleIndexResponse.newBuilder()
                .setStatus(status.toProto())
                .setRetMsg(status.toString())
                .build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
        return;
      }

      String msg = "OK";
      GetLocalShuffleIndexResponse reply;
      String requestInfo =
          "appId[" + appId + "], shuffleId[" + shuffleId + "], partitionId[" + partitionId + "]";

      int[] range =
          ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum);
      Storage storage =
          shuffleServer
              .getStorageManager()
              .selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0]));
      if (storage != null) {
        storage.updateReadMetrics(new StorageReadMetrics(appId, shuffleId));
      }
      // Index file is expected small size and won't cause oom problem with the assumed size. An
      // index
      // segment is 40B,
      // with the default size - 2MB, it can support 50k blocks for shuffle data.
      long assumedFileSize =
          shuffleServer
              .getShuffleServerConf()
              .getLong(ShuffleServerConf.SERVER_SHUFFLE_INDEX_SIZE_HINT);
      if (shuffleServer.getShuffleBufferManager().requireReadMemory(assumedFileSize)) {
        ShuffleIndexResult shuffleIndexResult = null;
        try {
          final long start = System.currentTimeMillis();
          shuffleIndexResult =
              shuffleServer
                  .getShuffleTaskManager()
                  .getShuffleIndex(
                      appId, shuffleId, partitionId, partitionNumPerRange, partitionNum);

          ByteBuffer data = shuffleIndexResult.getIndexData();
          ShuffleServerMetrics.counterTotalReadDataSize.inc(data.remaining());
          ShuffleServerMetrics.counterTotalReadLocalIndexFileSize.inc(data.remaining());
          ShuffleServerMetrics.gaugeReadLocalIndexFileThreadNum.inc();
          ShuffleServerMetrics.gaugeReadLocalIndexFileBufferSize.inc(assumedFileSize);
          GetLocalShuffleIndexResponse.Builder builder =
              GetLocalShuffleIndexResponse.newBuilder().setStatus(status.toProto()).setRetMsg(msg);
          builder.setIndexData(UnsafeByteOperations.unsafeWrap(data));
          builder.setDataFileLen(shuffleIndexResult.getDataFileLen());
          builder.addAllStorageIds(
              Arrays.stream(shuffleIndexResult.getStorageIds())
                  .boxed()
                  .collect(Collectors.toList()));
          long readTime = System.currentTimeMillis() - start;
          shuffleServer
              .getGrpcMetrics()
              .recordProcessTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_INDEX_METHOD, readTime);
          LOG.info(
              "Successfully getShuffleIndex cost {} ms for {} bytes with {}",
              readTime,
              data.remaining(),
              requestInfo);
          auditContext.withReturnValue("len=" + shuffleIndexResult.getDataFileLen());
          reply = builder.build();
        } catch (FileNotFoundException indexFileNotFoundException) {
          LOG.warn(
              "Index file for {} is not found, maybe the data has been flushed to cold storage "
                  + "or still in memory buffer pool.",
              requestInfo,
              indexFileNotFoundException);
          reply = GetLocalShuffleIndexResponse.newBuilder().setStatus(status.toProto()).build();
        } catch (Exception e) {
          status = StatusCode.INTERNAL_ERROR;
          msg = "Error happened when get shuffle index for " + requestInfo + ", " + e.getMessage();
          LOG.error(msg, e);
          reply =
              GetLocalShuffleIndexResponse.newBuilder()
                  .setStatus(status.toProto())
                  .setRetMsg(msg)
                  .build();
        } finally {
          if (shuffleIndexResult != null) {
            shuffleIndexResult.release();
            ShuffleServerMetrics.gaugeReadLocalIndexFileThreadNum.dec();
            ShuffleServerMetrics.gaugeReadLocalIndexFileBufferSize.dec(assumedFileSize);
          }
          shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize);
        }
      } else {
        status = StatusCode.NO_BUFFER;
        msg = "Can't require memory to get shuffle index";
        LOG.warn("{} for {}", msg, requestInfo);
        reply =
            GetLocalShuffleIndexResponse.newBuilder()
                .setStatus(status.toProto())
                .setRetMsg(msg)
                .build();
      }
      auditContext.withStatusCode(status);
      responseObserver.onNext(reply);
      responseObserver.onCompleted();
    }
  }