in internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java [203:283]
public long requirePreAllocation(
String appId,
int shuffleId,
List<Integer> partitionIds,
int requireSize,
int retryMax,
long retryIntervalMax) {
RequireBufferRequest rpcRequest =
RequireBufferRequest.newBuilder()
.setShuffleId(shuffleId)
.addAllPartitionIds(partitionIds)
.setAppId(appId)
.setRequireSize(requireSize)
.build();
long start = System.currentTimeMillis();
RequireBufferResponse rpcResponse = getBlockingStub().requireBuffer(rpcRequest);
int retry = 0;
long result = FAILED_REQUIRE_ID;
Random random = new Random();
final int backOffBase = 2000;
while (rpcResponse.getStatus() == RssProtos.StatusCode.NO_BUFFER) {
LOG.info(
"Can't require "
+ requireSize
+ " bytes from "
+ host
+ ":"
+ port
+ ", sleep and try["
+ retry
+ "] again");
if (retry >= retryMax) {
LOG.warn(
"ShuffleServer "
+ host
+ ":"
+ port
+ " is full and can't send shuffle"
+ " data successfully after retry "
+ retryMax
+ " times, cost: {}(ms)",
System.currentTimeMillis() - start);
return result;
}
try {
long backoffTime =
Math.min(
retryIntervalMax,
backOffBase * (1L << Math.min(retry, 16)) + random.nextInt(backOffBase));
Thread.sleep(backoffTime);
} catch (Exception e) {
LOG.warn("Exception happened when require pre allocation from " + host + ":" + port, e);
}
rpcResponse = getBlockingStub().requireBuffer(rpcRequest);
retry++;
}
if (rpcResponse.getStatus() == RssProtos.StatusCode.SUCCESS) {
LOG.debug(
"Require preAllocated size of {} from {}:{}, cost: {}(ms)",
requireSize,
host,
port,
System.currentTimeMillis() - start);
result = rpcResponse.getRequireBufferId();
} else if (rpcResponse.getStatus() == RssProtos.StatusCode.NO_REGISTER) {
String msg =
"Can't require "
+ requireSize
+ " bytes from "
+ host
+ ":"
+ port
+ ", statusCode="
+ rpcResponse.getStatus()
+ ", errorMsg:"
+ rpcResponse.getRetMsg();
throw new NotRetryException(msg);
}
return result;
}