in master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java [383:491]
public void restoreMetaFromFile(File file) throws IOException {
try (BufferedInputStream in = new BufferedInputStream(new FileInputStream(file))) {
PbSnapshotMetaInfo snapshotMetaInfo = PbSnapshotMetaInfo.parseFrom(in);
cleanUpState();
estimatedPartitionSize = snapshotMetaInfo.getEstimatedPartitionSize();
for (String shuffleKey : snapshotMetaInfo.getRegisteredShuffleList()) {
Tuple2<String, Object> appIdShuffleId = Utils.splitShuffleKey(shuffleKey);
registeredAppAndShuffles
.computeIfAbsent(appIdShuffleId._1, v -> new HashSet<>())
.add((Integer) appIdShuffleId._2);
}
hostnameSet.addAll(snapshotMetaInfo.getHostnameSetList());
excludedWorkers.addAll(
snapshotMetaInfo.getExcludedWorkersList().stream()
.map(PbSerDeUtils::fromPbWorkerInfo)
.collect(Collectors.toSet()));
manuallyExcludedWorkers.addAll(
snapshotMetaInfo.getManuallyExcludedWorkersList().stream()
.map(PbSerDeUtils::fromPbWorkerInfo)
.collect(Collectors.toSet()));
workerLostEvents.addAll(
snapshotMetaInfo.getWorkerLostEventsList().stream()
.map(PbSerDeUtils::fromPbWorkerInfo)
.collect(Collectors.toSet()));
appHeartbeatTime.putAll(snapshotMetaInfo.getAppHeartbeatTimeMap());
registeredAppAndShuffles.forEach(
(appId, shuffleId) -> {
if (!appHeartbeatTime.containsKey(appId)) {
appHeartbeatTime.put(appId, System.currentTimeMillis());
}
});
Set<WorkerInfo> workerInfoSet =
snapshotMetaInfo.getWorkersList().stream()
.map(PbSerDeUtils::fromPbWorkerInfo)
.collect(Collectors.toSet());
List<String> workerHostList =
workerInfoSet.stream()
.filter(w -> NetworkTopology.DEFAULT_RACK.equals(w.networkLocation()))
.map(WorkerInfo::host)
.collect(Collectors.toList());
scala.collection.immutable.Map<String, Node> resolveMap =
rackResolver.resolveToMap(workerHostList);
workersMap.putAll(
workerInfoSet.stream()
.peek(
workerInfo -> {
// Reset worker's network location with current master's configuration.
if (NetworkTopology.DEFAULT_RACK.equals(workerInfo.networkLocation())) {
workerInfo.networkLocation_$eq(
resolveMap.get(workerInfo.host()).get().getNetworkLocation());
}
})
.collect(Collectors.toMap(WorkerInfo::toUniqueId, w -> w)));
snapshotMetaInfo
.getLostWorkersMap()
.forEach((key, value) -> lostWorkers.put(WorkerInfo.fromUniqueId(key), value));
snapshotMetaInfo
.getWorkerEventInfosMap()
.entrySet()
.forEach(
entry ->
workerEventInfos.put(
WorkerInfo.fromUniqueId(entry.getKey()),
PbSerDeUtils.fromPbWorkerEventInfo(entry.getValue())));
shutdownWorkers.addAll(
snapshotMetaInfo.getShutdownWorkersList().stream()
.map(PbSerDeUtils::fromPbWorkerInfo)
.collect(Collectors.toSet()));
decommissionWorkers.addAll(
snapshotMetaInfo.getDecommissionWorkersList().stream()
.map(PbSerDeUtils::fromPbWorkerInfo)
.collect(Collectors.toSet()));
partitionTotalWritten.add(snapshotMetaInfo.getPartitionTotalWritten());
partitionTotalFileCount.add(snapshotMetaInfo.getPartitionTotalFileCount());
shuffleTotalCount.add(snapshotMetaInfo.getShuffleTotalCount());
addShuffleFallbackCounts(snapshotMetaInfo.getShuffleFallbackCountsMap());
snapshotMetaInfo
.getApplicationMetasMap()
.forEach(
(key, value) -> applicationMetas.put(key, PbSerDeUtils.fromPbApplicationMeta(value)));
availableWorkers.addAll(
workersMap.values().stream()
.filter(worker -> isWorkerAvailable(worker))
.collect(Collectors.toSet()));
} catch (Exception e) {
throw new IOException(e);
}
LOG.info("Successfully restore meta info from snapshot {}", file.getAbsolutePath());
LOG.info(
"Worker size: {}, Registered shuffle size: {}. Worker excluded list size: {}. Manually Excluded list size: {}",
workersMap.size(),
registeredAppAndShuffles.size(),
excludedWorkers.size(),
manuallyExcludedWorkers.size());
workersMap.values().forEach(workerInfo -> LOG.info(workerInfo.toString()));
registeredAppAndShuffles.forEach(
(appId, shuffleId) -> LOG.info("RegisteredShuffle {}-{}", appId, shuffleId));
}