in client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java [635:711]
private Roaring64NavigableMap getExpectedTasksByExecutorId(
int shuffleId, int startPartition, int endPartition, int startMapIndex, int endMapIndex) {
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf();
Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>>> mapStatusIter = null;
// Since Spark 3.1 refactors the interface of getMapSizesByExecutorId,
// we use reflection and catch for the compatibility with 3.0 & 3.1 & 3.2
try {
// attempt to use Spark 3.1's API
mapStatusIter =
(Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>>>)
SparkEnv.get()
.mapOutputTracker()
.getClass()
.getDeclaredMethod(
"getMapSizesByExecutorId",
int.class,
int.class,
int.class,
int.class,
int.class)
.invoke(
SparkEnv.get().mapOutputTracker(),
shuffleId,
startMapIndex,
endMapIndex,
startPartition,
endPartition);
} catch (Exception ignored) {
// fallback and attempt to use Spark 3.0's API
try {
mapStatusIter =
(Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>>>)
SparkEnv.get()
.mapOutputTracker()
.getClass()
.getDeclaredMethod("getMapSizesByExecutorId", int.class, int.class, int.class)
.invoke(
SparkEnv.get().mapOutputTracker(), shuffleId, startPartition, endPartition);
} catch (Exception ignored1) {
try {
// attempt to use Spark 3.2.0's API
// Each Spark release will be versioned: [MAJOR].[FEATURE].[MAINTENANCE].
// Usually we only need to adapt [MAJOR].[FEATURE] . Unfortunately,
// some interfaces were removed wrongly in Spark 3.2.0. And they were added by Spark
// 3.2.1.
// So we need to adapt Spark 3.2.0 here
mapStatusIter =
(Iterator<Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>>>)
MapOutputTracker.class
.getDeclaredMethod(
"getMapSizesByExecutorId",
int.class,
int.class,
int.class,
int.class,
int.class)
.invoke(
SparkEnv.get().mapOutputTracker(),
shuffleId,
startMapIndex,
endMapIndex,
startPartition,
endPartition);
} catch (Exception e) {
throw new RssException(e);
}
}
}
while (mapStatusIter.hasNext()) {
Tuple2<BlockManagerId, Seq<Tuple3<BlockId, Object, Object>>> tuple2 = mapStatusIter.next();
if (!tuple2._1().topologyInfo().isDefined()) {
throw new RssException("Can't get expected taskAttemptId");
}
taskIdBitmap.add(Long.parseLong(tuple2._1().topologyInfo().get()));
}
return taskIdBitmap;
}