private Roaring64NavigableMap getExpectedTasksByExecutorId()

in client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java [635:711]


  private Roaring64NavigableMap getExpectedTasksByExecutorId(
      int shuffleId, int startPartition, int endPartition, int startMapIndex, int endMapIndex) {
    Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf();
    Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>>> mapStatusIter = null;
    // Since Spark 3.1 refactors the interface of getMapSizesByExecutorId,
    // we use reflection and catch for the compatibility with 3.0 & 3.1 & 3.2
    try {
      // attempt to use Spark 3.1's API
      mapStatusIter =
          (Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>>>)
              SparkEnv.get()
                  .mapOutputTracker()
                  .getClass()
                  .getDeclaredMethod(
                      "getMapSizesByExecutorId",
                      int.class,
                      int.class,
                      int.class,
                      int.class,
                      int.class)
                  .invoke(
                      SparkEnv.get().mapOutputTracker(),
                      shuffleId,
                      startMapIndex,
                      endMapIndex,
                      startPartition,
                      endPartition);
    } catch (Exception ignored) {
      // fallback and attempt to use Spark 3.0's API
      try {
        mapStatusIter =
            (Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>>>)
                SparkEnv.get()
                    .mapOutputTracker()
                    .getClass()
                    .getDeclaredMethod("getMapSizesByExecutorId", int.class, int.class, int.class)
                    .invoke(
                        SparkEnv.get().mapOutputTracker(), shuffleId, startPartition, endPartition);
      } catch (Exception ignored1) {
        try {
          // attempt to use Spark 3.2.0's API
          // Each Spark release will be versioned: [MAJOR].[FEATURE].[MAINTENANCE].
          // Usually we only need to adapt [MAJOR].[FEATURE] . Unfortunately,
          // some interfaces were removed wrongly in Spark 3.2.0. And they were added by Spark
          // 3.2.1.
          // So we need to adapt Spark 3.2.0 here
          mapStatusIter =
              (Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>>>)
                  MapOutputTracker.class
                      .getDeclaredMethod(
                          "getMapSizesByExecutorId",
                          int.class,
                          int.class,
                          int.class,
                          int.class,
                          int.class)
                      .invoke(
                          SparkEnv.get().mapOutputTracker(),
                          shuffleId,
                          startMapIndex,
                          endMapIndex,
                          startPartition,
                          endPartition);
        } catch (Exception e) {
          throw new RssException(e);
        }
      }
    }
    while (mapStatusIter.hasNext()) {
      Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>> tuple2 = mapStatusIter.next();
      if (!tuple2._1().topologyInfo().isDefined()) {
        throw new RssException("Can't get expected taskAttemptId");
      }
      taskIdBitmap.add(Long.parseLong(tuple2._1().topologyInfo().get()));
    }
    return taskIdBitmap;
  }