public long requirePreAllocation()

in internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java [203:283]


  public long requirePreAllocation(
      String appId,
      int shuffleId,
      List<Integer> partitionIds,
      int requireSize,
      int retryMax,
      long retryIntervalMax) {
    RequireBufferRequest rpcRequest =
        RequireBufferRequest.newBuilder()
            .setShuffleId(shuffleId)
            .addAllPartitionIds(partitionIds)
            .setAppId(appId)
            .setRequireSize(requireSize)
            .build();

    long start = System.currentTimeMillis();
    RequireBufferResponse rpcResponse = getBlockingStub().requireBuffer(rpcRequest);
    int retry = 0;
    long result = FAILED_REQUIRE_ID;
    Random random = new Random();
    final int backOffBase = 2000;
    while (rpcResponse.getStatus() == RssProtos.StatusCode.NO_BUFFER) {
      LOG.info(
          "Can't require "
              + requireSize
              + " bytes from "
              + host
              + ":"
              + port
              + ", sleep and try["
              + retry
              + "] again");
      if (retry >= retryMax) {
        LOG.warn(
            "ShuffleServer "
                + host
                + ":"
                + port
                + " is full and can't send shuffle"
                + " data successfully after retry "
                + retryMax
                + " times, cost: {}(ms)",
            System.currentTimeMillis() - start);
        return result;
      }
      try {
        long backoffTime =
            Math.min(
                retryIntervalMax,
                backOffBase * (1L << Math.min(retry, 16)) + random.nextInt(backOffBase));
        Thread.sleep(backoffTime);
      } catch (Exception e) {
        LOG.warn("Exception happened when require pre allocation from " + host + ":" + port, e);
      }
      rpcResponse = getBlockingStub().requireBuffer(rpcRequest);
      retry++;
    }
    if (rpcResponse.getStatus() == RssProtos.StatusCode.SUCCESS) {
      LOG.debug(
          "Require preAllocated size of {} from {}:{}, cost: {}(ms)",
          requireSize,
          host,
          port,
          System.currentTimeMillis() - start);
      result = rpcResponse.getRequireBufferId();
    } else if (rpcResponse.getStatus() == RssProtos.StatusCode.NO_REGISTER) {
      String msg =
          "Can't require "
              + requireSize
              + " bytes from "
              + host
              + ":"
              + port
              + ", statusCode="
              + rpcResponse.getStatus()
              + ", errorMsg:"
              + rpcResponse.getRetMsg();
      throw new NotRetryException(msg);
    }
    return result;
  }