public void registerShuffle()

in server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java [247:405]


  public void registerShuffle(
      ShuffleRegisterRequest req, StreamObserver<ShuffleRegisterResponse> responseObserver) {
    try (ServerRpcAuditContext auditContext = createAuditContext("registerShuffle")) {
      ShuffleRegisterResponse reply;
      String appId = req.getAppId();
      int shuffleId = req.getShuffleId();
      int stageAttemptNumber = req.getStageAttemptNumber();
      String remoteStoragePath = req.getRemoteStorage().getPath();
      String user = req.getUser();
      auditContext.withAppId(appId).withShuffleId(shuffleId);
      auditContext.withArgs(
          "remoteStoragePath="
              + remoteStoragePath
              + ", user="
              + user
              + ", stageAttemptNumber="
              + stageAttemptNumber);
      // If the Stage is registered for the first time, you do not need to consider the Stage retry
      // and delete the Block data that has been sent.
      ShuffleTaskInfo taskInfo = shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(appId);
      if (taskInfo != null) {
        // Prevents AttemptNumber of multiple stages from modifying the latest AttemptNumber.
        synchronized (taskInfo) {
          int lastAttemptNumber = taskInfo.getLatestStageAttemptNumber(shuffleId);
          if (stageAttemptNumber > 0 && stageAttemptNumber > lastAttemptNumber) {
            taskInfo.refreshLatestStageAttemptNumber(shuffleId, stageAttemptNumber);
            try {
              long start = System.currentTimeMillis();
              shuffleServer
                  .getShuffleTaskManager()
                  .removeShuffleDataSyncRenameAndDelete(appId, shuffleId);
              LOG.info(
                  "Deleted the previous stage attempt data due to stage recomputing for app: {}, "
                      + "shuffleId: {}, stageAttemptNumber: {}. It costs {} ms",
                  appId,
                  shuffleId,
                  lastAttemptNumber,
                  System.currentTimeMillis() - start);
              // Add a check to prevent undeleted metadata.
              ShuffleBlockIdManager shuffleBlockIdManager =
                  shuffleServer
                      .getShuffleTaskManager()
                      .getShuffleTaskInfo(appId)
                      .getShuffleBlockIdManager();
              long blockCountByShuffleId =
                  shuffleBlockIdManager.getBlockCountByShuffleId(
                      appId, Lists.newArrayList(shuffleId));
              if (blockCountByShuffleId != 0) {
                LOG.error(
                    "Metadata is not deleted on clearing previous stage attempt data for app: {}, shuffleId: {}, stageAttemptNumber: {}",
                    appId,
                    shuffleId,
                    lastAttemptNumber);
                StatusCode code = StatusCode.INTERNAL_ERROR;
                auditContext.withStatusCode(code);
                reply = ShuffleRegisterResponse.newBuilder().setStatus(code.toProto()).build();
                responseObserver.onNext(reply);
                responseObserver.onCompleted();
                return;
              }
            } catch (Exception e) {
              LOG.error(
                  "Errors on clearing previous stage attempt data for app: {}, shuffleId: {}, stageAttemptNumber: {}",
                  appId,
                  shuffleId,
                  lastAttemptNumber,
                  e);
              StatusCode code = StatusCode.INTERNAL_ERROR;
              auditContext.withStatusCode(code);
              reply = ShuffleRegisterResponse.newBuilder().setStatus(code.toProto()).build();
              responseObserver.onNext(reply);
              responseObserver.onCompleted();
              return;
            }
          } else if (stageAttemptNumber > 0 && stageAttemptNumber <= lastAttemptNumber) {
            LOG.info(
                "The registration failed. The latest retry count is smaller than the existing retry count. This situation should not exist.");
            // When a Stage retry occurs, the first or last registration of a Stage may need to be
            // ignored and the ignored status quickly returned.
            StatusCode code = StatusCode.STAGE_RETRY_IGNORE;
            auditContext.withStatusCode(code);
            reply = ShuffleRegisterResponse.newBuilder().setStatus(code.toProto()).build();
            responseObserver.onNext(reply);
            responseObserver.onCompleted();
            return;
          }
        }
      }

      ShuffleDataDistributionType shuffleDataDistributionType =
          ShuffleDataDistributionType.valueOf(
              Optional.ofNullable(req.getShuffleDataDistribution())
                  .orElse(RssProtos.DataDistribution.NORMAL)
                  .name());

      int maxConcurrencyPerPartitionToWrite = req.getMaxConcurrencyPerPartitionToWrite();

      Map<String, String> remoteStorageConf =
          req.getRemoteStorage().getRemoteStorageConfList().stream()
              .collect(
                  Collectors.toMap(RemoteStorageConfItem::getKey, RemoteStorageConfItem::getValue));

      List<PartitionRange> partitionRanges = toPartitionRanges(req.getPartitionRangesList());
      LOG.info(
          "Get register request for appId["
              + appId
              + "], shuffleId["
              + shuffleId
              + "], remoteStorage["
              + remoteStoragePath
              + "] with "
              + partitionRanges.size()
              + " partition ranges. User: {}",
          user);

      StatusCode result =
          shuffleServer
              .getShuffleTaskManager()
              .registerShuffle(
                  appId,
                  shuffleId,
                  stageAttemptNumber,
                  partitionRanges,
                  new RemoteStorageInfo(remoteStoragePath, remoteStorageConf),
                  user,
                  shuffleDataDistributionType,
                  maxConcurrencyPerPartitionToWrite,
                  req.getPropertiesMap());
      if (StatusCode.SUCCESS == result
          && shuffleServer.isRemoteMergeEnable()
          && req.hasMergeContext()) {
        // The merged block is in a different domain from the original block,
        // so you need to register a new app for holding the merged block.
        result =
            shuffleServer
                .getShuffleTaskManager()
                .registerShuffle(
                    appId + MERGE_APP_SUFFIX,
                    shuffleId,
                    stageAttemptNumber,
                    partitionRanges,
                    new RemoteStorageInfo(remoteStoragePath, remoteStorageConf),
                    user,
                    shuffleDataDistributionType,
                    maxConcurrencyPerPartitionToWrite,
                    req.getPropertiesMap());
        if (result == StatusCode.SUCCESS) {
          result =
              shuffleServer
                  .getShuffleMergeManager()
                  .registerShuffle(appId, shuffleId, req.getMergeContext());
        }
      }
      auditContext.withStatusCode(result);
      reply = ShuffleRegisterResponse.newBuilder().setStatus(result.toProto()).build();
      responseObserver.onNext(reply);
      responseObserver.onCompleted();
    }
  }