in client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java [686:758]
Map<Integer, Integer> reviveBatch(
int shuffleId, Set<Integer> mapIds, Collection<ReviveRequest> requests) {
// partitionId -> StatusCode#getValue
Map<Integer, Integer> results = new HashMap<>();
// Local cached map of (partitionId -> PartitionLocation)
ConcurrentHashMap<Integer, PartitionLocation> partitionLocationMap =
reducePartitionMap.get(shuffleId);
Map<Integer, PartitionLocation> oldLocMap = new HashMap<>();
Iterator<ReviveRequest> iter = requests.iterator();
while (iter.hasNext()) {
ReviveRequest req = iter.next();
oldLocMap.put(req.partitionId, req.loc);
}
try {
PbChangeLocationResponse response =
lifecycleManagerRef.askSync(
Revive$.MODULE$.apply(shuffleId, mapIds, requests),
conf.clientRpcRequestPartitionLocationRpcAskTimeout(),
ClassTag$.MODULE$.apply(PbChangeLocationResponse.class));
for (int i = 0; i < response.getEndedMapIdCount(); i++) {
int mapId = response.getEndedMapId(i);
mapperEndMap.computeIfAbsent(shuffleId, (id) -> ConcurrentHashMap.newKeySet()).add(mapId);
}
for (int i = 0; i < response.getPartitionInfoCount(); i++) {
PbChangeLocationPartitionInfo partitionInfo = response.getPartitionInfo(i);
int partitionId = partitionInfo.getPartitionId();
int statusCode = partitionInfo.getStatus();
if (partitionInfo.getOldAvailable()) {
PartitionLocation oldLoc = oldLocMap.get(partitionId);
// Currently, revive only check if main location available, here won't remove peer loc.
pushExcludedWorkers.remove(oldLoc.hostAndPushPort());
}
if (StatusCode.SUCCESS.getValue() == statusCode) {
PartitionLocation loc =
PbSerDeUtils.fromPbPartitionLocation(partitionInfo.getPartition());
partitionLocationMap.put(partitionId, loc);
pushExcludedWorkers.remove(loc.hostAndPushPort());
if (loc.hasPeer()) {
pushExcludedWorkers.remove(loc.getPeer().hostAndPushPort());
}
} else if (StatusCode.STAGE_ENDED.getValue() == statusCode) {
stageEndShuffleSet.add(shuffleId);
return results;
} else if (StatusCode.SHUFFLE_NOT_REGISTERED.getValue() == statusCode) {
logger.error("SHUFFLE_NOT_REGISTERED!");
return null;
}
results.put(partitionId, statusCode);
}
return results;
} catch (Exception e) {
StringBuilder partitionIds = new StringBuilder();
StringBuilder epochs = new StringBuilder();
requests.forEach(
(req) -> {
partitionIds.append(req.partitionId).append(",");
epochs.append(req.epoch).append(",");
});
logger.error(
"Exception raised while reviving for shuffle {} partitionIds {} epochs {}.",
shuffleId,
partitionIds,
epochs,
e);
return null;
}
}