in server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java [753:838]
public void requireBuffer(
RequireBufferRequest request, StreamObserver<RequireBufferResponse> responseObserver) {
try (ServerRpcAuditContext auditContext = createAuditContext("requireBuffer")) {
String appId = request.getAppId();
auditContext.withAppId(appId).withShuffleId(request.getShuffleId());
String auditArgs = "requireSize=" + request.getRequireSize();
if (request.getPartitionIdsList() != null) {
auditArgs += ", partitionIdsSize=" + request.getPartitionIdsList().size();
}
if (request.getPartitionIdsList() != null) {
auditArgs +=
", partitionIds=" + OutputUtils.listToSegment(request.getPartitionIdsList(), 1, 10);
}
auditContext.withArgs(auditArgs);
StatusCode status = verifyRequest(appId);
if (status != StatusCode.SUCCESS) {
auditContext.withStatusCode(status);
RequireBufferResponse response =
RequireBufferResponse.newBuilder()
.setStatus(status.toProto())
.setRetMsg(status.toString())
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
return;
}
long requireBufferId = -1;
String responseMessage = "";
String shuffleDataInfo = "appId[" + appId + "], shuffleId[" + request.getShuffleId() + "]";
List<Integer> needSplitPartitionIds = Collections.emptyList();
try {
if (StringUtils.isEmpty(appId)) {
// To be compatible with older client version
requireBufferId =
shuffleServer.getShuffleTaskManager().requireBuffer(request.getRequireSize());
} else {
Pair<Long, List<Integer>> pair =
shuffleServer
.getShuffleTaskManager()
.requireBufferReturnPair(
appId,
request.getShuffleId(),
request.getPartitionIdsList(),
request.getPartitionRequireSizesList(),
request.getRequireSize());
requireBufferId = pair.getLeft();
needSplitPartitionIds = pair.getRight();
}
} catch (NoBufferException e) {
responseMessage = e.getMessage();
status = StatusCode.NO_BUFFER;
ShuffleServerMetrics.counterTotalRequireBufferFailedForRegularPartition.inc();
ShuffleServerMetrics.counterTotalRequireBufferFailed.inc();
} catch (NoBufferForHugePartitionException e) {
responseMessage = e.getMessage();
status = StatusCode.NO_BUFFER_FOR_HUGE_PARTITION;
ShuffleServerMetrics.counterTotalRequireBufferFailedForHugePartition.inc();
ShuffleServerMetrics.counterTotalRequireBufferFailed.inc();
} catch (NoRegisterException e) {
responseMessage = e.getMessage();
status = StatusCode.NO_REGISTER;
ShuffleServerMetrics.counterTotalRequireBufferFailed.inc();
} catch (ExceedHugePartitionHardLimitException e) {
status = StatusCode.EXCEED_HUGE_PARTITION_HARD_LIMIT;
ShuffleServerMetrics.counterTotalHugePartitionExceedHardLimitNum.inc();
ShuffleServerMetrics.counterTotalRequireBufferFailed.inc();
responseMessage =
"ExceedHugePartitionHardLimitException Error happened when requireBuffer for "
+ shuffleDataInfo
+ ": "
+ e.getMessage();
LOG.error(responseMessage);
}
auditContext.withStatusCode(status);
auditContext.withReturnValue("requireBufferId=" + requireBufferId);
RequireBufferResponse response =
RequireBufferResponse.newBuilder()
.setStatus(status.toProto())
.setRequireBufferId(requireBufferId)
.setRetMsg(responseMessage)
.addAllNeedSplitPartitionIds(needSplitPartitionIds)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}