public void getShuffleAssignments()

in coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java [141:231]


  public void getShuffleAssignments(
      GetShuffleServerRequest request,
      StreamObserver<GetShuffleAssignmentsResponse> responseObserver) {
    try (CoordinatorRpcAuditContext auditContext = createAuditContext("getShuffleAssignments")) {
      final String appId = request.getApplicationId();
      final int shuffleId = request.getShuffleId();
      final int partitionNum = request.getPartitionNum();
      final int partitionNumPerRange = request.getPartitionNumPerRange();
      final int replica = request.getDataReplica();
      final Set<String> requiredTags = Sets.newHashSet(request.getRequireTagsList());
      final int requiredShuffleServerNumber = request.getAssignmentShuffleServerNumber();
      final int estimateTaskConcurrency = request.getEstimateTaskConcurrency();
      final Set<String> faultyServerIds = new HashSet<>(request.getFaultyServerIdsList());

      auditContext.withAppId(appId);
      auditContext.withArgs(
          String.format(
              "shuffleId=%d, partitionNum=%d, partitionNumPerRange=%d, replica=%d, requiredTags=%s, "
                  + "requiredShuffleServerNumber=%d, faultyServerIds=%s, stageId=%d, stageAttemptNumber=%d, isReassign=%b",
              shuffleId,
              partitionNum,
              partitionNumPerRange,
              replica,
              requiredTags,
              requiredShuffleServerNumber,
              faultyServerIds,
              request.getStageId(),
              request.getStageAttemptNumber(),
              request.getReassign()));

      LOG.info(
          "Request of getShuffleAssignments for appId[{}], shuffleId[{}], partitionNum[{}],"
              + " partitionNumPerRange[{}], replica[{}], requiredTags[{}], requiredShuffleServerNumber[{}],"
              + " faultyServerIds[{}], stageId[{}], stageAttemptNumber[{}], isReassign[{}]",
          appId,
          shuffleId,
          partitionNum,
          partitionNumPerRange,
          replica,
          requiredTags,
          requiredShuffleServerNumber,
          faultyServerIds.size(),
          request.getStageId(),
          request.getStageAttemptNumber(),
          request.getReassign());

      GetShuffleAssignmentsResponse response = null;
      try {
        if (!coordinatorServer.getClusterManager().isReadyForServe()) {
          throw new Exception("Coordinator is out-of-service when in starting.");
        }

        final PartitionRangeAssignment pra =
            coordinatorServer
                .getAssignmentStrategy()
                .assign(
                    partitionNum,
                    partitionNumPerRange,
                    replica,
                    requiredTags,
                    requiredShuffleServerNumber,
                    estimateTaskConcurrency,
                    faultyServerIds);
        response = CoordinatorUtils.toGetShuffleAssignmentsResponse(pra);
        logAssignmentResult(appId, shuffleId, pra);
        responseObserver.onNext(response);
      } catch (Exception e) {
        LOG.error(
            "Errors on getting shuffle assignments for app: {}, shuffleId: {}, partitionNum: {}, "
                + "partitionNumPerRange: {}, replica: {}, requiredTags: {}",
            appId,
            shuffleId,
            partitionNum,
            partitionNumPerRange,
            replica,
            requiredTags,
            e);
        response =
            GetShuffleAssignmentsResponse.newBuilder()
                .setStatus(StatusCode.INTERNAL_ERROR)
                .setRetMsg(e.getMessage())
                .build();
        responseObserver.onNext(response);
      } finally {
        if (response != null) {
          auditContext.withStatusCode(response.getStatus());
        }
        responseObserver.onCompleted();
      }
    }
  }