public MutableShuffleHandleInfo reassignOnBlockSendFailure()

in client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java [1001:1122]


  public MutableShuffleHandleInfo reassignOnBlockSendFailure(
      int stageId,
      int stageAttemptNumber,
      int shuffleId,
      Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers,
      boolean partitionSplit) {
    long startTime = System.currentTimeMillis();
    ShuffleHandleInfo handleInfo = shuffleHandleInfoManager.get(shuffleId);
    MutableShuffleHandleInfo internalHandle = null;
    if (handleInfo instanceof MutableShuffleHandleInfo) {
      internalHandle = (MutableShuffleHandleInfo) handleInfo;
    } else if (handleInfo instanceof StageAttemptShuffleHandleInfo) {
      internalHandle =
          (MutableShuffleHandleInfo) ((StageAttemptShuffleHandleInfo) handleInfo).getCurrent();
    }
    if (internalHandle == null) {
      throw new RssException(
          "An unexpected error occurred: internalHandle is null, which should not happen");
    }
    synchronized (internalHandle) {
      // If the reassignment servers for one partition exceeds the max reassign server num,
      // it should fast fail.
      if (!partitionSplit) {
        // Do not check the partition reassign server num for partition split case
        internalHandle.checkPartitionReassignServerNum(
            partitionToFailureServers.keySet(), partitionReassignMaxServerNum);
      }

      Map<ShuffleServerInfo, List<PartitionRange>> newServerToPartitions = new HashMap<>();
      // receivingFailureServer -> partitionId -> replacementServerIds. For logging
      Map<String, Map<Integer, Set<String>>> reassignResult = new HashMap<>();

      for (Map.Entry<Integer, List<ReceivingFailureServer>> entry :
          partitionToFailureServers.entrySet()) {
        int partitionId = entry.getKey();
        for (ReceivingFailureServer receivingFailureServer : entry.getValue()) {
          StatusCode code = receivingFailureServer.getStatusCode();
          String serverId = receivingFailureServer.getServerId();

          boolean serverHasReplaced = false;

          Set<ShuffleServerInfo> updatedReassignServers;
          if (!partitionSplit) {
            Set<ShuffleServerInfo> replacements = internalHandle.getReplacements(serverId);
            if (CollectionUtils.isEmpty(replacements)) {
              replacements =
                  requestReassignServer(
                      stageId,
                      stageAttemptNumber,
                      shuffleId,
                      internalHandle,
                      partitionId,
                      serverId);
            } else {
              serverHasReplaced = true;
            }
            updatedReassignServers =
                internalHandle.updateAssignment(partitionId, serverId, replacements);
          } else {
            // todo: must ensure in the load_balance mode, the same partition will not trigger multi
            // reassignments.
            int requireServerNum = 1;
            if (partitionSplitMode == PartitionSplitMode.LOAD_BALANCE) {
              requireServerNum = partitionSplitLoadBalanceServerNum;
            }

            Set<ShuffleServerInfo> replacements =
                internalHandle.getReplacementsForPartition(partitionId, serverId);
            if (CollectionUtils.isEmpty(replacements)) {
              replacements =
                  requestReassignServer(
                      stageId,
                      stageAttemptNumber,
                      shuffleId,
                      internalHandle,
                      partitionId,
                      serverId,
                      requireServerNum);
            } else {
              serverHasReplaced = true;
            }
            updatedReassignServers =
                internalHandle.updateAssignmentOnPartitionSplit(
                    partitionId, serverId, replacements);
          }

          if (!updatedReassignServers.isEmpty()) {
            reassignResult
                .computeIfAbsent(serverId, x -> new HashMap<>())
                .computeIfAbsent(partitionId, x -> new HashSet<>())
                .addAll(
                    updatedReassignServers.stream()
                        .map(x -> x.getId())
                        .collect(Collectors.toSet()));

            if (serverHasReplaced) {
              for (ShuffleServerInfo serverInfo : updatedReassignServers) {
                newServerToPartitions
                    .computeIfAbsent(serverInfo, x -> new ArrayList<>())
                    .add(new PartitionRange(partitionId, partitionId));
              }
            }
          }
        }
      }
      if (!newServerToPartitions.isEmpty()) {
        LOG.info(
            "Register the new partition->servers assignment on reassign. {}",
            newServerToPartitions);
        registerShuffleServers(
            getAppId(), shuffleId, newServerToPartitions, getRemoteStorageInfo());
      }

      LOG.info(
          "Finished reassignOnBlockSendFailure request and cost {}(ms). is partition split:{}. Reassign result: {}",
          System.currentTimeMillis() - startTime,
          partitionSplit,
          reassignResult);

      return internalHandle;
    }
  }