Map reviveBatch()

in client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java [686:758]


  Map<Integer, Integer> reviveBatch(
      int shuffleId, Set<Integer> mapIds, Collection<ReviveRequest> requests) {
    // partitionId -> StatusCode#getValue
    Map<Integer, Integer> results = new HashMap<>();

    // Local cached map of (partitionId -> PartitionLocation)
    ConcurrentHashMap<Integer, PartitionLocation> partitionLocationMap =
        reducePartitionMap.get(shuffleId);

    Map<Integer, PartitionLocation> oldLocMap = new HashMap<>();
    Iterator<ReviveRequest> iter = requests.iterator();
    while (iter.hasNext()) {
      ReviveRequest req = iter.next();
      oldLocMap.put(req.partitionId, req.loc);
    }
    try {
      PbChangeLocationResponse response =
          lifecycleManagerRef.askSync(
              Revive$.MODULE$.apply(shuffleId, mapIds, requests),
              conf.clientRpcRequestPartitionLocationRpcAskTimeout(),
              ClassTag$.MODULE$.apply(PbChangeLocationResponse.class));

      for (int i = 0; i < response.getEndedMapIdCount(); i++) {
        int mapId = response.getEndedMapId(i);
        mapperEndMap.computeIfAbsent(shuffleId, (id) -> ConcurrentHashMap.newKeySet()).add(mapId);
      }

      for (int i = 0; i < response.getPartitionInfoCount(); i++) {
        PbChangeLocationPartitionInfo partitionInfo = response.getPartitionInfo(i);
        int partitionId = partitionInfo.getPartitionId();
        int statusCode = partitionInfo.getStatus();
        if (partitionInfo.getOldAvailable()) {
          PartitionLocation oldLoc = oldLocMap.get(partitionId);
          // Currently, revive only check if main location available, here won't remove peer loc.
          pushExcludedWorkers.remove(oldLoc.hostAndPushPort());
        }

        if (StatusCode.SUCCESS.getValue() == statusCode) {
          PartitionLocation loc =
              PbSerDeUtils.fromPbPartitionLocation(partitionInfo.getPartition());
          partitionLocationMap.put(partitionId, loc);
          pushExcludedWorkers.remove(loc.hostAndPushPort());
          if (loc.hasPeer()) {
            pushExcludedWorkers.remove(loc.getPeer().hostAndPushPort());
          }
        } else if (StatusCode.STAGE_ENDED.getValue() == statusCode) {
          stageEndShuffleSet.add(shuffleId);
          return results;
        } else if (StatusCode.SHUFFLE_NOT_REGISTERED.getValue() == statusCode) {
          logger.error("SHUFFLE_NOT_REGISTERED!");
          return null;
        }
        results.put(partitionId, statusCode);
      }

      return results;
    } catch (Exception e) {
      StringBuilder partitionIds = new StringBuilder();
      StringBuilder epochs = new StringBuilder();
      requests.forEach(
          (req) -> {
            partitionIds.append(req.partitionId).append(",");
            epochs.append(req.epoch).append(",");
          });
      logger.error(
          "Exception raised while reviving for shuffle {} partitionIds {} epochs {}.",
          shuffleId,
          partitionIds,
          epochs,
          e);
      return null;
    }
  }