in coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java [141:231]
public void getShuffleAssignments(
GetShuffleServerRequest request,
StreamObserver<GetShuffleAssignmentsResponse> responseObserver) {
try (CoordinatorRpcAuditContext auditContext = createAuditContext("getShuffleAssignments")) {
final String appId = request.getApplicationId();
final int shuffleId = request.getShuffleId();
final int partitionNum = request.getPartitionNum();
final int partitionNumPerRange = request.getPartitionNumPerRange();
final int replica = request.getDataReplica();
final Set<String> requiredTags = Sets.newHashSet(request.getRequireTagsList());
final int requiredShuffleServerNumber = request.getAssignmentShuffleServerNumber();
final int estimateTaskConcurrency = request.getEstimateTaskConcurrency();
final Set<String> faultyServerIds = new HashSet<>(request.getFaultyServerIdsList());
auditContext.withAppId(appId);
auditContext.withArgs(
String.format(
"shuffleId=%d, partitionNum=%d, partitionNumPerRange=%d, replica=%d, requiredTags=%s, "
+ "requiredShuffleServerNumber=%d, faultyServerIds=%s, stageId=%d, stageAttemptNumber=%d, isReassign=%b",
shuffleId,
partitionNum,
partitionNumPerRange,
replica,
requiredTags,
requiredShuffleServerNumber,
faultyServerIds,
request.getStageId(),
request.getStageAttemptNumber(),
request.getReassign()));
LOG.info(
"Request of getShuffleAssignments for appId[{}], shuffleId[{}], partitionNum[{}],"
+ " partitionNumPerRange[{}], replica[{}], requiredTags[{}], requiredShuffleServerNumber[{}],"
+ " faultyServerIds[{}], stageId[{}], stageAttemptNumber[{}], isReassign[{}]",
appId,
shuffleId,
partitionNum,
partitionNumPerRange,
replica,
requiredTags,
requiredShuffleServerNumber,
faultyServerIds.size(),
request.getStageId(),
request.getStageAttemptNumber(),
request.getReassign());
GetShuffleAssignmentsResponse response = null;
try {
if (!coordinatorServer.getClusterManager().isReadyForServe()) {
throw new Exception("Coordinator is out-of-service when in starting.");
}
final PartitionRangeAssignment pra =
coordinatorServer
.getAssignmentStrategy()
.assign(
partitionNum,
partitionNumPerRange,
replica,
requiredTags,
requiredShuffleServerNumber,
estimateTaskConcurrency,
faultyServerIds);
response = CoordinatorUtils.toGetShuffleAssignmentsResponse(pra);
logAssignmentResult(appId, shuffleId, pra);
responseObserver.onNext(response);
} catch (Exception e) {
LOG.error(
"Errors on getting shuffle assignments for app: {}, shuffleId: {}, partitionNum: {}, "
+ "partitionNumPerRange: {}, replica: {}, requiredTags: {}",
appId,
shuffleId,
partitionNum,
partitionNumPerRange,
replica,
requiredTags,
e);
response =
GetShuffleAssignmentsResponse.newBuilder()
.setStatus(StatusCode.INTERNAL_ERROR)
.setRetMsg(e.getMessage())
.build();
responseObserver.onNext(response);
} finally {
if (response != null) {
auditContext.withStatusCode(response.getStatus());
}
responseObserver.onCompleted();
}
}
}