public Roaring64NavigableMap fetchAllRssTaskIds()

in client-mr/core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/RssEventFetcher.java [65:113]


  public Roaring64NavigableMap fetchAllRssTaskIds() {
    try {
      acceptMapCompletionEvents();
    } catch (Exception e) {
      throw new RssException(
          "Reduce: " + reduce + " fails to accept completion events due to: " + e.getMessage());
    }

    Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf();
    Roaring64NavigableMap mapIndexBitmap = Roaring64NavigableMap.bitmapOf();
    String errMsg = "TaskAttemptIDs are inconsistent with map tasks";
    for (TaskAttemptID taskAttemptID : successMaps) {
      if (!obsoleteMaps.contains(taskAttemptID)) {
        long rssTaskId = RssMRUtils.convertTaskAttemptIdToLong(taskAttemptID, appAttemptId);
        int mapIndex = taskAttemptID.getTaskID().getId();
        // There can be multiple successful attempts on same map task.
        // So we only need to accept one of them.
        if (!mapIndexBitmap.contains(mapIndex)) {
          taskIdBitmap.addLong(rssTaskId);
          if (mapIndex < totalMapsCount) {
            mapIndexBitmap.addLong(mapIndex);
          } else {
            LOG.error(taskAttemptID + " has overflowed mapIndex");
            throw new IllegalStateException(errMsg);
          }
        } else {
          LOG.warn(taskAttemptID + " is redundant on index: " + mapIndex);
        }
      } else {
        LOG.warn(taskAttemptID + " is successful but cancelled by obsolete event");
      }
    }
    // each map should have only one success attempt
    if (mapIndexBitmap.getLongCardinality() != taskIdBitmap.getLongCardinality()) {
      throw new IllegalStateException(errMsg);
    }
    if (tipFailedCount != 0) {
      LOG.warn("There are " + tipFailedCount + " tipFailed tasks");
    }
    if (taskIdBitmap.getLongCardinality() + tipFailedCount != totalMapsCount) {
      for (int index = 0; index < totalMapsCount; index++) {
        if (!mapIndexBitmap.contains(index)) {
          LOG.error("Fail to fetch " + " map task on index: " + index);
        }
      }
      throw new IllegalStateException(errMsg);
    }
    return taskIdBitmap;
  }